You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/12/02 04:40:20 UTC

[03/11] camel git commit: CAMEL-8085 Make barrier await timeout configurable

CAMEL-8085 Make barrier await timeout configurable


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/257ef8f6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/257ef8f6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/257ef8f6

Branch: refs/heads/master
Commit: 257ef8f6c74c054ca368043c5f228ff432bb15f1
Parents: 0f7ed7b
Author: Ivan Vasylyev <va...@gmail.com>
Authored: Mon Dec 1 14:31:39 2014 +0200
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Dec 2 10:53:10 2014 +0800

----------------------------------------------------------------------
 .../apache/camel/component/kafka/KafkaConfiguration.java |  9 +++++++++
 .../org/apache/camel/component/kafka/KafkaConsumer.java  | 11 +++--------
 .../org/apache/camel/component/kafka/KafkaEndpoint.java  |  8 ++++++++
 3 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/257ef8f6/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 7d59dd2..669e1a0 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -30,6 +30,7 @@ public class KafkaConfiguration {
     private int consumerStreams = 10;
     private int consumersCount = 1;
     private int batchSize = 100;
+    private int barrierAwaitTimeoutMs = 10000;
 
     //Common configuration properties
     private String clientId;
@@ -207,6 +208,14 @@ public class KafkaConfiguration {
         this.batchSize = batchSize;
     }
 
+    public int getBarrierAwaitTimeoutMs() {
+        return barrierAwaitTimeoutMs;
+    }
+
+    public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
+        this.barrierAwaitTimeoutMs = barrierAwaitTimeoutMs;
+    }
+
     public int getConsumersCount() {
         return consumersCount;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/257ef8f6/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 2711bfa..1813376 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.kafka;
 
 import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
@@ -123,10 +122,8 @@ public class KafkaConsumer extends DefaultConsumer {
         }
 
         public void run() {
-            ConsumerIterator<byte[], byte[]> it = stream.iterator();
             int processed = 0;
-            while (it.hasNext()) {
-                MessageAndMetadata<byte[], byte[]> mm = it.next();
+            for (MessageAndMetadata<byte[], byte[]> mm : stream) {
                 Exchange exchange = endpoint.createKafkaExchange(mm);
                 try {
                     processor.process(exchange);
@@ -136,7 +133,7 @@ public class KafkaConsumer extends DefaultConsumer {
                 processed++;
                 if (processed >= endpoint.getBatchSize()) {
                     try {
-                        berrier.await(10, TimeUnit.SECONDS);
+                        berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
                         processed = 0;
                     } catch (InterruptedException e) {
                         LOG.error(e.getMessage(), e);
@@ -175,9 +172,7 @@ public class KafkaConsumer extends DefaultConsumer {
         }
 
         public void run() {
-            ConsumerIterator<byte[], byte[]> it = stream.iterator();
-            while (it.hasNext()) {
-                MessageAndMetadata<byte[], byte[]> mm = it.next();
+            for (MessageAndMetadata<byte[], byte[]> mm : stream) {
                 Exchange exchange = endpoint.createKafkaExchange(mm);
                 try {
                     processor.process(exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/257ef8f6/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 1ca50d9..d32cb83 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -170,6 +170,14 @@ public class KafkaEndpoint extends DefaultEndpoint {
         this.configuration.setBatchSize(batchSize);
     }
 
+    public int getBarrierAwaitTimeoutMs() {
+        return configuration.getBarrierAwaitTimeoutMs();
+    }
+
+    public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
+        this.configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs);
+    }
+
     public int getConsumersCount() {
         return this.configuration.getConsumersCount();
     }