You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/10/22 07:20:23 UTC

[2/2] camel git commit: Added two new consumer properties for the kafka component: 'offsets.storage' and 'dual.commit.enabled'.

Added two new consumer properties for the kafka component: 'offsets.storage' and 'dual.commit.enabled'.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8254d2ed
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8254d2ed
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8254d2ed

Branch: refs/heads/camel-2.16.x
Commit: 8254d2ed6950d9e68fe5d38363a83d6409803fb2
Parents: 37019e9
Author: Jacob Nelson <ja...@target.com>
Authored: Wed Oct 21 18:53:58 2015 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Oct 22 07:23:00 2015 +0200

----------------------------------------------------------------------
 .../component/kafka/KafkaConfiguration.java     | 31 ++++++++++++++++++++
 .../camel/component/kafka/KafkaEndpoint.java    | 17 +++++++++++
 2 files changed, 48 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8254d2ed/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 4858af9..b03970c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -81,6 +81,10 @@ public class KafkaConfiguration {
     private String autoOffsetReset = "largest";
     @UriParam(label = "consumer")
     private Integer consumerTimeoutMs;
+    @UriParam(label = "consumer", defaultValue = "zookeeper", enums = "zookeeper,kafka")
+    private String offsetsStorage = "zookeeper";
+    @UriParam(label = "consumer", defaultValue = "true")
+    private Boolean dualCommitEnabled = true;
 
     //Zookeepr configuration properties
     @UriParam
@@ -174,6 +178,8 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, "zookeeper.session.timeout.ms", getZookeeperSessionTimeoutMs());
         addPropertyIfNotNull(props, "zookeeper.connection.timeout.ms", getZookeeperConnectionTimeoutMs());
         addPropertyIfNotNull(props, "zookeeper.sync.time.ms", getZookeeperSyncTimeMs());
+        addPropertyIfNotNull(props, "offsets.storage", getOffsetsStorage());
+        addPropertyIfNotNull(props, "dual.commit.enabled", isDualCommitEnabled());
         return props;
     }
 
@@ -743,4 +749,29 @@ public class KafkaConfiguration {
     public void setKeySerializerClass(String keySerializerClass) {
         this.keySerializerClass = keySerializerClass;
     }
+
+    public String getOffsetsStorage() {
+        return offsetsStorage;
+    }
+
+    /**
+     * Select where offsets should be stored (zookeeper or kafka).
+     */
+    public void setOffsetsStorage(String offsetsStorage) {
+        this.offsetsStorage = offsetsStorage;
+    }
+
+    public Boolean isDualCommitEnabled() {
+        return dualCommitEnabled;
+    }
+
+    /**
+     * If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka).
+     * This is required during migration from zookeeper-based offset storage to kafka-based offset storage.
+     * With respect to any given consumer group, it is safe to turn this off after all instances within that group have been migrated
+     * to the new version that commits offsets to the broker (instead of directly to ZooKeeper).
+     */
+    public void setDualCommitEnabled(Boolean dualCommitEnabled) {
+        this.dualCommitEnabled = dualCommitEnabled;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8254d2ed/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 5dfe736..3073385 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -491,4 +491,21 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     public void setBridgeEndpoint(boolean bridgeEndpoint) {
         this.bridgeEndpoint = bridgeEndpoint;
     }
+
+    public String getOffsetsStorage() {
+        return configuration.getOffsetsStorage();
+    }
+
+    public void setOffsetsStorage(String offsetsStorage) {
+        configuration.setOffsetsStorage(offsetsStorage);
+    }
+
+    public Boolean isDualCommitEnabled() {
+        return configuration.isDualCommitEnabled();
+    }
+
+    public void setDualCommitEnabled(boolean dualCommitEnabled) {
+        configuration.setDualCommitEnabled(dualCommitEnabled);
+    }
+
 }