You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/08/07 16:49:38 UTC

[1/4] camel git commit: Polished

Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x 990c6b579 -> 654db0d39
  refs/heads/master 91ff7d0b6 -> 805db35b4


Polished


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e09c51d2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e09c51d2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e09c51d2

Branch: refs/heads/master
Commit: e09c51d2d9d7a1d71bde9a5a626f3e86153748f2
Parents: 91ff7d0
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 7 16:39:42 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 7 16:39:42 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/component/kafka/KafkaConsumer.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e09c51d2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 1657fa7..cb2dc9d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -121,11 +121,11 @@ public class KafkaConsumer extends DefaultConsumer {
     class BatchingConsumerTask implements Runnable {
 
         private KafkaStream<byte[], byte[]> stream;
-        private CyclicBarrier berrier;
+        private CyclicBarrier barrier;
 
-        public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier berrier) {
+        public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier barrier) {
             this.stream = stream;
-            this.berrier = berrier;
+            this.barrier = barrier;
         }
 
         public void run() {
@@ -160,7 +160,7 @@ public class KafkaConsumer extends DefaultConsumer {
                 if (processed >= endpoint.getBatchSize() || consumerTimeout 
                     || (processed > 0 && !hasNext)) { // Need to commit the offset for the last round
                     try {
-                        berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
+                        barrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
                         if (!consumerTimeout) {
                             processed = 0;
                         }


[3/4] camel git commit: Polished

Posted by da...@apache.org.
Polished


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38f9d3bc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38f9d3bc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38f9d3bc

Branch: refs/heads/camel-2.15.x
Commit: 38f9d3bc8955fcee6010d12e093b63461b1ed504
Parents: 990c6b5
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 7 16:39:42 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 7 16:57:07 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/component/kafka/KafkaConsumer.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/38f9d3bc/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 1657fa7..cb2dc9d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -121,11 +121,11 @@ public class KafkaConsumer extends DefaultConsumer {
     class BatchingConsumerTask implements Runnable {
 
         private KafkaStream<byte[], byte[]> stream;
-        private CyclicBarrier berrier;
+        private CyclicBarrier barrier;
 
-        public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier berrier) {
+        public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier barrier) {
             this.stream = stream;
-            this.berrier = berrier;
+            this.barrier = barrier;
         }
 
         public void run() {
@@ -160,7 +160,7 @@ public class KafkaConsumer extends DefaultConsumer {
                 if (processed >= endpoint.getBatchSize() || consumerTimeout 
                     || (processed > 0 && !hasNext)) { // Need to commit the offset for the last round
                     try {
-                        berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
+                        barrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
                         if (!consumerTimeout) {
                             processed = 0;
                         }


[4/4] camel git commit: CAMEL-8653: camel-kafka consumer should commit offset on consumer when its no longer running such as suspending/shutting down.

Posted by da...@apache.org.
CAMEL-8653: camel-kafka consumer should commit offset on consumer when its no longer running such as suspending/shutting down.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/654db0d3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/654db0d3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/654db0d3

Branch: refs/heads/camel-2.15.x
Commit: 654db0d39c755101afe81e0169c386b7f4f029c3
Parents: 38f9d3b
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 7 16:56:36 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 7 16:57:13 2015 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 24 ++++++++++++++++----
 1 file changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/654db0d3/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index cb2dc9d..94893fb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -33,6 +33,7 @@ import kafka.message.MessageAndMetadata;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +73,7 @@ public class KafkaConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
         log.info("Starting Kafka consumer");
+
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConsumersCount(); i++) {
             ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
@@ -79,8 +81,10 @@ public class KafkaConsumer extends DefaultConsumer {
             topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
             List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
+
+            // commit periodically
             if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
-                if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs().intValue() < 0)
+                if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs() < 0)
                         && endpoint.getConsumerStreams() > 1) {
                     LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
                 }
@@ -90,8 +94,9 @@ public class KafkaConsumer extends DefaultConsumer {
                 }
                 consumerBarriers.put(consumer, barrier);
             } else {
+                // auto commit
                 for (final KafkaStream<byte[], byte[]> stream : streams) {
-                    executor.submit(new AutoCommitConsumerTask(stream));
+                    executor.submit(new AutoCommitConsumerTask(consumer, stream));
                 }
                 consumerBarriers.put(consumer, null);
             }
@@ -103,11 +108,14 @@ public class KafkaConsumer extends DefaultConsumer {
     protected void doStop() throws Exception {
         super.doStop();
         log.info("Stopping Kafka consumer");
+
         for (ConsumerConnector consumer : consumerBarriers.keySet()) {
             if (consumer != null) {
                 consumer.shutdown();
             }
         }
+        consumerBarriers.clear();
+
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
                 getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -175,7 +183,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
     class CommitOffsetTask implements Runnable {
 
-        private ConsumerConnector consumer;
+        private final ConsumerConnector consumer;
 
         public CommitOffsetTask(ConsumerConnector consumer) {
             this.consumer = consumer;
@@ -183,22 +191,25 @@ public class KafkaConsumer extends DefaultConsumer {
 
         @Override
         public void run() {
+            LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
             consumer.commitOffsets();
         }
     }
 
     class AutoCommitConsumerTask implements Runnable {
 
+        private final ConsumerConnector consumer;
         private KafkaStream<byte[], byte[]> stream;
 
-        public AutoCommitConsumerTask(KafkaStream<byte[], byte[]> stream) {
+        public AutoCommitConsumerTask(ConsumerConnector consumer, KafkaStream<byte[], byte[]> stream) {
+            this.consumer = consumer;
             this.stream = stream;
         }
 
         public void run() {
             ConsumerIterator<byte[], byte[]> it = stream.iterator();
             // only poll the next message if we are allowed to run and are not suspending
-            while (isRunAllowed() && it.hasNext()) {
+            while (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) {
                 MessageAndMetadata<byte[], byte[]> mm = it.next();
                 Exchange exchange = endpoint.createKafkaExchange(mm);
                 try {
@@ -207,6 +218,9 @@ public class KafkaConsumer extends DefaultConsumer {
                     getExceptionHandler().handleException("Error during processing", exchange, e);
                 }
             }
+            // no more data so commit offset
+            LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
+            consumer.commitOffsets();
         }
     }
 }


[2/4] camel git commit: CAMEL-8653: camel-kafka consumer should commit offset on consumer when its no longer running such as suspending/shutting down.

Posted by da...@apache.org.
CAMEL-8653: camel-kafka consumer should commit offset on consumer when its no longer running such as suspending/shutting down.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/805db35b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/805db35b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/805db35b

Branch: refs/heads/master
Commit: 805db35b482b0f3b65df80906e1d42dcea1b3c9e
Parents: e09c51d
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 7 16:56:36 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 7 16:56:36 2015 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 24 ++++++++++++++++----
 1 file changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/805db35b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index cb2dc9d..94893fb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -33,6 +33,7 @@ import kafka.message.MessageAndMetadata;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +73,7 @@ public class KafkaConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
         log.info("Starting Kafka consumer");
+
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConsumersCount(); i++) {
             ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
@@ -79,8 +81,10 @@ public class KafkaConsumer extends DefaultConsumer {
             topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
             List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
+
+            // commit periodically
             if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
-                if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs().intValue() < 0)
+                if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs() < 0)
                         && endpoint.getConsumerStreams() > 1) {
                     LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
                 }
@@ -90,8 +94,9 @@ public class KafkaConsumer extends DefaultConsumer {
                 }
                 consumerBarriers.put(consumer, barrier);
             } else {
+                // auto commit
                 for (final KafkaStream<byte[], byte[]> stream : streams) {
-                    executor.submit(new AutoCommitConsumerTask(stream));
+                    executor.submit(new AutoCommitConsumerTask(consumer, stream));
                 }
                 consumerBarriers.put(consumer, null);
             }
@@ -103,11 +108,14 @@ public class KafkaConsumer extends DefaultConsumer {
     protected void doStop() throws Exception {
         super.doStop();
         log.info("Stopping Kafka consumer");
+
         for (ConsumerConnector consumer : consumerBarriers.keySet()) {
             if (consumer != null) {
                 consumer.shutdown();
             }
         }
+        consumerBarriers.clear();
+
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
                 getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -175,7 +183,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
     class CommitOffsetTask implements Runnable {
 
-        private ConsumerConnector consumer;
+        private final ConsumerConnector consumer;
 
         public CommitOffsetTask(ConsumerConnector consumer) {
             this.consumer = consumer;
@@ -183,22 +191,25 @@ public class KafkaConsumer extends DefaultConsumer {
 
         @Override
         public void run() {
+            LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
             consumer.commitOffsets();
         }
     }
 
     class AutoCommitConsumerTask implements Runnable {
 
+        private final ConsumerConnector consumer;
         private KafkaStream<byte[], byte[]> stream;
 
-        public AutoCommitConsumerTask(KafkaStream<byte[], byte[]> stream) {
+        public AutoCommitConsumerTask(ConsumerConnector consumer, KafkaStream<byte[], byte[]> stream) {
+            this.consumer = consumer;
             this.stream = stream;
         }
 
         public void run() {
             ConsumerIterator<byte[], byte[]> it = stream.iterator();
             // only poll the next message if we are allowed to run and are not suspending
-            while (isRunAllowed() && it.hasNext()) {
+            while (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) {
                 MessageAndMetadata<byte[], byte[]> mm = it.next();
                 Exchange exchange = endpoint.createKafkaExchange(mm);
                 try {
@@ -207,6 +218,9 @@ public class KafkaConsumer extends DefaultConsumer {
                     getExceptionHandler().handleException("Error during processing", exchange, e);
                 }
             }
+            // no more data so commit offset
+            LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
+            consumer.commitOffsets();
         }
     }
 }