You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/12/02 04:40:20 UTC
[03/11] camel git commit: CAMEL-8085 Make barrier await timeout
configurable
CAMEL-8085 Make barrier await timeout configurable
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/257ef8f6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/257ef8f6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/257ef8f6
Branch: refs/heads/master
Commit: 257ef8f6c74c054ca368043c5f228ff432bb15f1
Parents: 0f7ed7b
Author: Ivan Vasylyev <va...@gmail.com>
Authored: Mon Dec 1 14:31:39 2014 +0200
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Dec 2 10:53:10 2014 +0800
----------------------------------------------------------------------
.../apache/camel/component/kafka/KafkaConfiguration.java | 9 +++++++++
.../org/apache/camel/component/kafka/KafkaConsumer.java | 11 +++--------
.../org/apache/camel/component/kafka/KafkaEndpoint.java | 8 ++++++++
3 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/257ef8f6/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 7d59dd2..669e1a0 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -30,6 +30,7 @@ public class KafkaConfiguration {
private int consumerStreams = 10;
private int consumersCount = 1;
private int batchSize = 100;
+ private int barrierAwaitTimeoutMs = 10000;
//Common configuration properties
private String clientId;
@@ -207,6 +208,14 @@ public class KafkaConfiguration {
this.batchSize = batchSize;
}
+ public int getBarrierAwaitTimeoutMs() {
+ return barrierAwaitTimeoutMs;
+ }
+
+ public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
+ this.barrierAwaitTimeoutMs = barrierAwaitTimeoutMs;
+ }
+
public int getConsumersCount() {
return consumersCount;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/257ef8f6/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 2711bfa..1813376 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -17,7 +17,6 @@
package org.apache.camel.component.kafka;
import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
@@ -123,10 +122,8 @@ public class KafkaConsumer extends DefaultConsumer {
}
public void run() {
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
int processed = 0;
- while (it.hasNext()) {
- MessageAndMetadata<byte[], byte[]> mm = it.next();
+ for (MessageAndMetadata<byte[], byte[]> mm : stream) {
Exchange exchange = endpoint.createKafkaExchange(mm);
try {
processor.process(exchange);
@@ -136,7 +133,7 @@ public class KafkaConsumer extends DefaultConsumer {
processed++;
if (processed >= endpoint.getBatchSize()) {
try {
- berrier.await(10, TimeUnit.SECONDS);
+ berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
processed = 0;
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
@@ -175,9 +172,7 @@ public class KafkaConsumer extends DefaultConsumer {
}
public void run() {
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
- while (it.hasNext()) {
- MessageAndMetadata<byte[], byte[]> mm = it.next();
+ for (MessageAndMetadata<byte[], byte[]> mm : stream) {
Exchange exchange = endpoint.createKafkaExchange(mm);
try {
processor.process(exchange);
http://git-wip-us.apache.org/repos/asf/camel/blob/257ef8f6/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 1ca50d9..d32cb83 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -170,6 +170,14 @@ public class KafkaEndpoint extends DefaultEndpoint {
this.configuration.setBatchSize(batchSize);
}
+ public int getBarrierAwaitTimeoutMs() {
+ return configuration.getBarrierAwaitTimeoutMs();
+ }
+
+ public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
+ this.configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs);
+ }
+
public int getConsumersCount() {
return this.configuration.getConsumersCount();
}