You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/10/22 07:20:23 UTC
[2/2] camel git commit: Added two new consumer properties for the
kafka component: 'offsets.storage' and 'dual.commit.enabled'.
Added two new consumer properties for the kafka component: 'offsets.storage' and 'dual.commit.enabled'.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8254d2ed
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8254d2ed
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8254d2ed
Branch: refs/heads/camel-2.16.x
Commit: 8254d2ed6950d9e68fe5d38363a83d6409803fb2
Parents: 37019e9
Author: Jacob Nelson <ja...@target.com>
Authored: Wed Oct 21 18:53:58 2015 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Oct 22 07:23:00 2015 +0200
----------------------------------------------------------------------
.../component/kafka/KafkaConfiguration.java | 31 ++++++++++++++++++++
.../camel/component/kafka/KafkaEndpoint.java | 17 +++++++++++
2 files changed, 48 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8254d2ed/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 4858af9..b03970c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -81,6 +81,10 @@ public class KafkaConfiguration {
private String autoOffsetReset = "largest";
@UriParam(label = "consumer")
private Integer consumerTimeoutMs;
+ @UriParam(label = "consumer", defaultValue = "zookeeper", enums = "zookeeper,kafka")
+ private String offsetsStorage = "zookeeper";
+ @UriParam(label = "consumer", defaultValue = "true")
+ private Boolean dualCommitEnabled = true;
//Zookeepr configuration properties
@UriParam
@@ -174,6 +178,8 @@ public class KafkaConfiguration {
addPropertyIfNotNull(props, "zookeeper.session.timeout.ms", getZookeeperSessionTimeoutMs());
addPropertyIfNotNull(props, "zookeeper.connection.timeout.ms", getZookeeperConnectionTimeoutMs());
addPropertyIfNotNull(props, "zookeeper.sync.time.ms", getZookeeperSyncTimeMs());
+ addPropertyIfNotNull(props, "offsets.storage", getOffsetsStorage());
+ addPropertyIfNotNull(props, "dual.commit.enabled", isDualCommitEnabled());
return props;
}
@@ -743,4 +749,29 @@ public class KafkaConfiguration {
public void setKeySerializerClass(String keySerializerClass) {
this.keySerializerClass = keySerializerClass;
}
+
+ public String getOffsetsStorage() {
+ return offsetsStorage;
+ }
+
+ /**
+ * Select where offsets should be stored (zookeeper or kafka).
+ */
+ public void setOffsetsStorage(String offsetsStorage) {
+ this.offsetsStorage = offsetsStorage;
+ }
+
+ public Boolean isDualCommitEnabled() {
+ return dualCommitEnabled;
+ }
+
+ /**
+ * If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka).
+ * This is required during migration from zookeeper-based offset storage to kafka-based offset storage.
+ * With respect to any given consumer group, it is safe to turn this off after all instances within that group have been migrated
+ * to the new version that commits offsets to the broker (instead of directly to ZooKeeper).
+ */
+ public void setDualCommitEnabled(Boolean dualCommitEnabled) {
+ this.dualCommitEnabled = dualCommitEnabled;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8254d2ed/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 5dfe736..3073385 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -491,4 +491,21 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
public void setBridgeEndpoint(boolean bridgeEndpoint) {
this.bridgeEndpoint = bridgeEndpoint;
}
+
+ public String getOffsetsStorage() {
+ return configuration.getOffsetsStorage();
+ }
+
+ public void setOffsetsStorage(String offsetsStorage) {
+ configuration.setOffsetsStorage(offsetsStorage);
+ }
+
+ public Boolean isDualCommitEnabled() {
+ return configuration.isDualCommitEnabled();
+ }
+
+ public void setDualCommitEnabled(boolean dualCommitEnabled) {
+ configuration.setDualCommitEnabled(dualCommitEnabled);
+ }
+
}