You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/10 20:07:36 UTC
kafka git commit: KAFKA-2798: Use prefixedd configurations for Kafka
Connect producer and consumer settings so they do not conflict with the
distributed herder's settings.
Repository: kafka
Updated Branches:
refs/heads/trunk ae5a5d7c0 -> 403d89ede
KAFKA-2798: Use prefixedd configurations for Kafka Connect producer and consumer settings so they do not conflict with the distributed herder's settings.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira
Closes #486 from ewencp/kafka-2798-conflicting-herder-producer-consumer-configs
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/403d89ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/403d89ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/403d89ed
Branch: refs/heads/trunk
Commit: 403d89edeaa7808f71c0e7318411c925895210f2
Parents: ae5a5d7
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Tue Nov 10 11:07:26 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Nov 10 11:07:26 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/kafka/common/config/AbstractConfig.java | 8 --------
.../main/java/org/apache/kafka/connect/runtime/Worker.java | 2 +-
.../org/apache/kafka/connect/runtime/WorkerSinkTask.java | 5 ++++-
3 files changed, 5 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/403d89ed/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 07b64c0..1029356 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -105,14 +105,6 @@ public class AbstractConfig {
return keys;
}
- public Map<String, Object> unusedConfigs() {
- Set<String> unusedKeys = this.unused();
- Map<String, Object> unusedProps = new HashMap<>();
- for (String key : unusedKeys)
- unusedProps.put(key, this.originals.get(key));
- return unusedProps;
- }
-
public Map<String, Object> originals() {
Map<String, Object> copy = new RecordingMap<>();
copy.putAll(originals);
http://git-wip-us.apache.org/repos/asf/kafka/blob/403d89ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 359a79c..f5b23ec 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -101,7 +101,7 @@ public class Worker {
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
- producerProps.putAll(config.unusedConfigs());
+ producerProps.putAll(config.originalsWithPrefix("producer."));
producer = new KafkaProducer<>(producerProps);
http://git-wip-us.apache.org/repos/asf/kafka/blob/403d89ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 643b10e..e0a3e04 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -233,7 +233,8 @@ class WorkerSinkTask implements WorkerTask {
private KafkaConsumer<byte[], byte[]> createConsumer() {
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task
- Map<String, Object> props = workerConfig.unusedConfigs();
+ Map<String, Object> props = new HashMap<>();
+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
@@ -242,6 +243,8 @@ class WorkerSinkTask implements WorkerTask {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ props.putAll(workerConfig.originalsWithPrefix("consumer."));
+
KafkaConsumer<byte[], byte[]> newConsumer;
try {
newConsumer = new KafkaConsumer<>(props);