You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/04/29 10:52:49 UTC
[1/2] camel git commit: CAMEL-9823: Exploring Consumer groups feature
in Camel-kafka consumer side. Thanks to Anbumani Balusamy for the patch.
Repository: camel
Updated Branches:
refs/heads/master 5fee9dd8e -> 1eeba05d4
CAMEL-9823: Exploring Consumer groups feature in Camel-kafka consumer side. Thanks to Anbumani Balusamy for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a107781b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a107781b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a107781b
Branch: refs/heads/master
Commit: a107781bab8c95e03e31a2c5824381d9ea78efeb
Parents: 5fee9dd
Author: Andrea Cosentino <an...@gmail.com>
Authored: Fri Apr 29 10:40:17 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Fri Apr 29 10:51:36 2016 +0200
----------------------------------------------------------------------
.../camel/component/kafka/KafkaConsumer.java | 54 ++++++++++++--------
1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a107781b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 7733231..82600e7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.kafka;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -26,6 +28,8 @@ import org.apache.camel.impl.DefaultConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +94,7 @@ public class KafkaConsumer extends DefaultConsumer {
private final String threadId;
private final Properties kafkaProps;
- KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
+ public KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
this.topicName = topicName;
this.threadId = topicName + "-" + "Thread " + id;
this.kafkaProps = kafkaProps;
@@ -112,26 +116,34 @@ public class KafkaConsumer extends DefaultConsumer {
consumer.seekToBeginning();
}
while (isRunAllowed() && !isSuspendingOrSuspended()) {
- ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE);
- for (ConsumerRecord<Object, Object> record : records) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
- }
- Exchange exchange = endpoint.createKafkaExchange(record);
- try {
- processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during processing", exchange, e);
- }
- processed++;
- // if autocommit is false
- if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
- if (processed >= endpoint.getBatchSize()) {
- consumer.commitSync();
- processed = 0;
- }
- }
- }
+ ConsumerRecords<Object, Object> allRecords = consumer.poll(Long.MAX_VALUE);
+ // START : CAMEL-9823
+ for (TopicPartition partition : allRecords.partitions()) {
+ List<ConsumerRecord<Object, Object>> partitionRecords = allRecords
+ .records(partition);
+ for (ConsumerRecord<Object, Object> record : partitionRecords) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value());
+ }
+ Exchange exchange = endpoint.createKafkaExchange(record);
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error during processing", exchange, e);
+ }
+ }
+ // if autocommit is false
+ if (endpoint.isAutoCommitEnable() != null
+ && !endpoint.isAutoCommitEnable()) {
+ long partitionLastoffset = partitionRecords.get(
+ partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(
+ partition, new OffsetAndMetadata(
+ partitionLastoffset + 1)));
+ }
+
+ }
+ // END : CAMEL-9823
}
LOG.debug("Unsubscribing {} from topic {}", threadId, topicName);
consumer.unsubscribe();
[2/2] camel git commit: Fixed CS
Posted by ac...@apache.org.
Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1eeba05d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1eeba05d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1eeba05d
Branch: refs/heads/master
Commit: 1eeba05d48a0ea12cc8bb4741bb7163d73281022
Parents: a107781
Author: Andrea Cosentino <an...@gmail.com>
Authored: Fri Apr 29 10:50:57 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Fri Apr 29 10:52:24 2016 +0200
----------------------------------------------------------------------
.../camel/component/kafka/KafkaConsumer.java | 51 +++++++++-----------
1 file changed, 23 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1eeba05d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 82600e7..8649a46 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -94,7 +94,7 @@ public class KafkaConsumer extends DefaultConsumer {
private final String threadId;
private final Properties kafkaProps;
- public KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
+ KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
this.topicName = topicName;
this.threadId = topicName + "-" + "Thread " + id;
this.kafkaProps = kafkaProps;
@@ -117,33 +117,28 @@ public class KafkaConsumer extends DefaultConsumer {
}
while (isRunAllowed() && !isSuspendingOrSuspended()) {
ConsumerRecords<Object, Object> allRecords = consumer.poll(Long.MAX_VALUE);
- // START : CAMEL-9823
- for (TopicPartition partition : allRecords.partitions()) {
- List<ConsumerRecord<Object, Object>> partitionRecords = allRecords
- .records(partition);
- for (ConsumerRecord<Object, Object> record : partitionRecords) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value());
- }
- Exchange exchange = endpoint.createKafkaExchange(record);
- try {
- processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during processing", exchange, e);
- }
- }
- // if autocommit is false
- if (endpoint.isAutoCommitEnable() != null
- && !endpoint.isAutoCommitEnable()) {
- long partitionLastoffset = partitionRecords.get(
- partitionRecords.size() - 1).offset();
- consumer.commitSync(Collections.singletonMap(
- partition, new OffsetAndMetadata(
- partitionLastoffset + 1)));
- }
-
- }
- // END : CAMEL-9823
+ for (TopicPartition partition : allRecords.partitions()) {
+ List<ConsumerRecord<Object, Object>> partitionRecords = allRecords
+ .records(partition);
+ for (ConsumerRecord<Object, Object> record : partitionRecords) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value());
+ }
+ Exchange exchange = endpoint.createKafkaExchange(record);
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error during processing", exchange, e);
+ }
+ }
+ // if autocommit is false
+ if (endpoint.isAutoCommitEnable() != null
+ && !endpoint.isAutoCommitEnable()) {
+ long partitionLastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(
+ partition, new OffsetAndMetadata(partitionLastoffset + 1)));
+ }
+ }
}
LOG.debug("Unsubscribing {} from topic {}", threadId, topicName);
consumer.unsubscribe();