You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/05 17:49:48 UTC

camel git commit: CAMEL-10944: camel-kafka - When consumer stop it should auto commit

Repository: camel
Updated Branches:
  refs/heads/master 4d567c485 -> f42c97e17


CAMEL-10944: camel-kafka - When consumer stop it should auto commit


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f42c97e1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f42c97e1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f42c97e1

Branch: refs/heads/master
Commit: f42c97e17f296ce3bfa7814854ac5599a0571752
Parents: 4d567c4
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Mar 5 18:48:02 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Mar 5 18:49:38 2017 +0100

----------------------------------------------------------------------
 .../camel-kafka/src/main/docs/kafka-component.adoc    |  3 ++-
 .../camel/component/kafka/KafkaConfiguration.java     | 14 ++++++++++++++
 .../apache/camel/component/kafka/KafkaConsumer.java   | 11 +++++++++++
 3 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f42c97e1/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index c7e1315..f9f5aa5 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -66,7 +66,7 @@ The Kafka component is configured using the URI syntax with the following path a
 |=======================================================================
 {% endraw %}
 
-#### 81 query parameters:
+#### 82 query parameters:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -78,6 +78,7 @@ The Kafka component is configured using the URI syntax with the following path a
 | partitioner | common | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key.
 | autoCommitEnable | consumer | true | Boolean | If true periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin.
 | autoCommitIntervalMs | consumer | 5000 | Integer | The frequency in ms that the consumer offsets are committed to zookeeper.
+| autoCommitOnStop | consumer | sync | String | Whether to perform an explicit auto commit when the consumer stops to ensure the broker has a commit from the last consumed message. This requires the option autoCommitEnable is turned on.
 | autoOffsetReset | consumer | latest | String | What to do when there is no initial offset in ZooKeeper or if an offset is out of range: smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset fail: throw exception to the consumer
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored.
 | checkCrcs | consumer | true | Boolean | Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead so it may be disabled in cases seeking extreme performance.

http://git-wip-us.apache.org/repos/asf/camel/blob/f42c97e1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 436287b..e96614f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -114,6 +114,8 @@ public class KafkaConfiguration {
     private String consumerId;
     @UriParam(label = "consumer", defaultValue = "true")
     private Boolean autoCommitEnable = true;
+    @UriParam(label = "consumer", defaultValue = "sync", enums = "sync,async,none")
+    private String autoCommitOnStop = "sync";
     @UriParam(label = "consumer")
     private StateRepository<String, String> offsetRepository;
 
@@ -612,6 +614,18 @@ public class KafkaConfiguration {
         this.autoOffsetReset = autoOffsetReset;
     }
 
+    public String getAutoCommitOnStop() {
+        return autoCommitOnStop;
+    }
+
+    /**
+     * Whether to perform an explicit auto commit when the consumer stops to ensure the broker
+     * has a commit from the last consumed message. This requires the option autoCommitEnable is turned on.
+     */
+    public void setAutoCommitOnStop(String autoCommitOnStop) {
+        this.autoCommitOnStop = autoCommitOnStop;
+    }
+
     public String getBrokers() {
         return brokers;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f42c97e1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 711da6b..321aebb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -217,6 +217,17 @@ public class KafkaConsumer extends DefaultConsumer {
                         }
                     }
                 }
+
+                if (endpoint.getConfiguration().isAutoCommitEnable() != null && endpoint.getConfiguration().isAutoCommitEnable()) {
+                    if ("async".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
+                        LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName);
+                        consumer.commitAsync();
+                    } else if ("sync".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
+                        LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName);
+                        consumer.commitSync();
+                    }
+                }
+
                 LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
                 consumer.unsubscribe();
             } catch (InterruptException e) {