You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/04/29 10:52:49 UTC

[1/2] camel git commit: CAMEL-9823: Exploring Consumer groups feature in Camel-kafka consumer side. Thanks to Anbumani Balusamy for the patch.

Repository: camel
Updated Branches:
  refs/heads/master 5fee9dd8e -> 1eeba05d4


CAMEL-9823: Exploring Consumer groups feature in Camel-kafka consumer side. Thanks to Anbumani Balusamy for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a107781b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a107781b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a107781b

Branch: refs/heads/master
Commit: a107781bab8c95e03e31a2c5824381d9ea78efeb
Parents: 5fee9dd
Author: Andrea Cosentino <an...@gmail.com>
Authored: Fri Apr 29 10:40:17 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Fri Apr 29 10:51:36 2016 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 54 ++++++++++++--------
 1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a107781b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 7733231..82600e7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.kafka;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
@@ -26,6 +28,8 @@ import org.apache.camel.impl.DefaultConsumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,7 +94,7 @@ public class KafkaConsumer extends DefaultConsumer {
         private final String threadId;
         private final Properties kafkaProps;
 
-        KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
+        public KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
             this.topicName = topicName;
             this.threadId = topicName + "-" + "Thread " + id;
             this.kafkaProps = kafkaProps;
@@ -112,26 +116,34 @@ public class KafkaConsumer extends DefaultConsumer {
                     consumer.seekToBeginning();
                 }
                 while (isRunAllowed() && !isSuspendingOrSuspended()) {
-                    ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE);
-                    for (ConsumerRecord<Object, Object> record : records) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
-                        }
-                        Exchange exchange = endpoint.createKafkaExchange(record);
-                        try {
-                            processor.process(exchange);
-                        } catch (Exception e) {
-                            getExceptionHandler().handleException("Error during processing", exchange, e);
-                        }
-                        processed++;
-                        // if autocommit is false
-                        if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
-                            if (processed >= endpoint.getBatchSize()) {
-                                consumer.commitSync();
-                                processed = 0;
-                            }
-                        }
-                    }
+                    ConsumerRecords<Object, Object> allRecords = consumer.poll(Long.MAX_VALUE);
+                    // START : CAMEL-9823
+					for (TopicPartition partition : allRecords.partitions()) {
+						List<ConsumerRecord<Object, Object>> partitionRecords = allRecords
+								.records(partition);
+	                    for (ConsumerRecord<Object, Object> record : partitionRecords) {
+	                        if (LOG.isTraceEnabled()) {
+	                            LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value());
+	                        }
+	                        Exchange exchange = endpoint.createKafkaExchange(record);
+	                        try {
+	                            processor.process(exchange);
+	                        } catch (Exception e) {
+	                            getExceptionHandler().handleException("Error during processing", exchange, e);
+	                        }
+	                    }
+						// if autocommit is false
+						if (endpoint.isAutoCommitEnable() != null
+								&& !endpoint.isAutoCommitEnable()) {
+							long partitionLastoffset = partitionRecords.get(
+									partitionRecords.size() - 1).offset();
+							consumer.commitSync(Collections.singletonMap(
+									partition, new OffsetAndMetadata(
+											partitionLastoffset + 1)));
+						}
+	                    
+					}
+					// END : CAMEL-9823
                 }
                 LOG.debug("Unsubscribing {} from topic {}", threadId, topicName);
                 consumer.unsubscribe();


[2/2] camel git commit: Fixed CS

Posted by ac...@apache.org.
Fixed CS


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1eeba05d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1eeba05d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1eeba05d

Branch: refs/heads/master
Commit: 1eeba05d48a0ea12cc8bb4741bb7163d73281022
Parents: a107781
Author: Andrea Cosentino <an...@gmail.com>
Authored: Fri Apr 29 10:50:57 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Fri Apr 29 10:52:24 2016 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 51 +++++++++-----------
 1 file changed, 23 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1eeba05d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 82600e7..8649a46 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -94,7 +94,7 @@ public class KafkaConsumer extends DefaultConsumer {
         private final String threadId;
         private final Properties kafkaProps;
 
-        public KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
+        KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
             this.topicName = topicName;
             this.threadId = topicName + "-" + "Thread " + id;
             this.kafkaProps = kafkaProps;
@@ -117,33 +117,28 @@ public class KafkaConsumer extends DefaultConsumer {
                 }
                 while (isRunAllowed() && !isSuspendingOrSuspended()) {
                     ConsumerRecords<Object, Object> allRecords = consumer.poll(Long.MAX_VALUE);
-                    // START : CAMEL-9823
-					for (TopicPartition partition : allRecords.partitions()) {
-						List<ConsumerRecord<Object, Object>> partitionRecords = allRecords
-								.records(partition);
-	                    for (ConsumerRecord<Object, Object> record : partitionRecords) {
-	                        if (LOG.isTraceEnabled()) {
-	                            LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value());
-	                        }
-	                        Exchange exchange = endpoint.createKafkaExchange(record);
-	                        try {
-	                            processor.process(exchange);
-	                        } catch (Exception e) {
-	                            getExceptionHandler().handleException("Error during processing", exchange, e);
-	                        }
-	                    }
-						// if autocommit is false
-						if (endpoint.isAutoCommitEnable() != null
-								&& !endpoint.isAutoCommitEnable()) {
-							long partitionLastoffset = partitionRecords.get(
-									partitionRecords.size() - 1).offset();
-							consumer.commitSync(Collections.singletonMap(
-									partition, new OffsetAndMetadata(
-											partitionLastoffset + 1)));
-						}
-	                    
-					}
-					// END : CAMEL-9823
+                    for (TopicPartition partition : allRecords.partitions()) {
+                        List<ConsumerRecord<Object, Object>> partitionRecords = allRecords
+                            .records(partition);
+                        for (ConsumerRecord<Object, Object> record : partitionRecords) {
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value());
+                            }
+                            Exchange exchange = endpoint.createKafkaExchange(record);
+                            try {
+                                processor.process(exchange);
+                            } catch (Exception e) {
+                                getExceptionHandler().handleException("Error during processing", exchange, e);
+                            }
+                        }
+                        // if autocommit is false
+                        if (endpoint.isAutoCommitEnable() != null
+                            && !endpoint.isAutoCommitEnable()) {
+                            long partitionLastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+                            consumer.commitSync(Collections.singletonMap(
+                                partition, new OffsetAndMetadata(partitionLastoffset + 1)));
+                        }
+                    }
                 }
                 LOG.debug("Unsubscribing {} from topic {}", threadId, topicName);
                 consumer.unsubscribe();