You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/30 07:41:00 UTC

[jira] [Commented] (KAFKA-7551) Refactor to create both producer & consumer in Worker

    [ https://issues.apache.org/jira/browse/KAFKA-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704365#comment-16704365 ] 

ASF GitHub Bot commented on KAFKA-7551:
---------------------------------------

ewencp closed pull request #5842: KAFKA-7551:Refactor to create producer & consumer in the worker.
URL: https://github.com/apache/kafka/pull/5842
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 6e021b90a07..49ba4fdc38b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.MetricName;
@@ -50,6 +52,7 @@
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.SinkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,7 +92,6 @@
     private final Converter internalKeyConverter;
     private final Converter internalValueConverter;
     private final OffsetBackingStore offsetBackingStore;
-    private final Map<String, Object> producerProps;
 
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
@@ -129,19 +131,6 @@ public Worker(
 
         this.workerConfigTransformer = initConfigTransformer();
 
-        producerProps = new HashMap<>();
-        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
-        // worker, but this may compromise the delivery guarantees of Kafka Connect.
-        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
-        producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
-        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
-        producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
-        // User-specified overrides
-        producerProps.putAll(config.originalsWithPrefix("producer."));
     }
 
     private WorkerConfigTransformer initConfigTransformer() {
@@ -498,6 +487,7 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
+            Map<String, Object> producerProps = producerConfigs(config);
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
 
             // Note we pass the configState as it performs dynamic transformations under the covers
@@ -508,15 +498,54 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
             TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics));
+
+            Map<String, Object> consumerProps = consumerConfigs(id, config);
+            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
+
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
-                    valueConverter, headerConverter, transformationChain, loader, time,
-                    retryWithToleranceOperator);
+                                      valueConverter, headerConverter, transformationChain, consumer, loader, time,
+                                      retryWithToleranceOperator);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
             throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
         }
     }
 
+    static Map<String, Object> producerConfigs(WorkerConfig config) {
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
+        // worker, but this may compromise the delivery guarantees of Kafka Connect.
+        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
+        producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
+        producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
+        // User-specified overrides
+        producerProps.putAll(config.originalsWithPrefix("producer."));
+        return producerProps;
+    }
+
+
+    static Map<String, Object> consumerConfigs(ConnectorTaskId id, WorkerConfig config) {
+        // Include any unknown worker configs so consumer configs can be set globally on the worker
+        // and through to the task
+        Map<String, Object> consumerProps = new HashMap<>();
+
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector()));
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                  Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+        consumerProps.putAll(config.originalsWithPrefix("consumer."));
+        return consumerProps;
+    }
+
     ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         return new ErrorHandlingMetrics(id, metrics);
     }
@@ -530,6 +559,7 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         // check if topic for dead letter queue exists
         String topic = connConfig.dlqTopicName();
         if (topic != null && !topic.isEmpty()) {
+            Map<String, Object> producerProps = producerConfigs(config);
             DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps, errorHandlingMetrics);
             reporters.add(reporter);
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 39e0c6d53f6..a112bfa41ae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -33,7 +32,6 @@
 import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
@@ -49,7 +47,6 @@
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.apache.kafka.connect.util.SinkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,6 +102,7 @@ public WorkerSinkTask(ConnectorTaskId id,
                           Converter valueConverter,
                           HeaderConverter headerConverter,
                           TransformationChain<SinkRecord> transformationChain,
+                          KafkaConsumer<byte[], byte[]> consumer,
                           ClassLoader loader,
                           Time time,
                           RetryWithToleranceOperator retryWithToleranceOperator) {
@@ -131,13 +129,13 @@ public WorkerSinkTask(ConnectorTaskId id,
         this.commitFailures = 0;
         this.sinkTaskMetricsGroup = new SinkTaskMetricsGroup(id, connectMetrics);
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
+        this.consumer = consumer;
     }
 
     @Override
     public void initialize(TaskConfig taskConfig) {
         try {
             this.taskConfig = taskConfig.originalsStrings();
-            this.consumer = createConsumer();
             this.context = new WorkerSinkTaskContext(consumer, this, configState);
         } catch (Throwable t) {
             log.error("{} Task failed initialization and will not be started.", this, t);
@@ -455,31 +453,6 @@ public String toString() {
         return msgs;
     }
 
-    private KafkaConsumer<byte[], byte[]> createConsumer() {
-        // Include any unknown worker configs so consumer configs can be set globally on the worker
-        // and through to the task
-        Map<String, Object> props = new HashMap<>();
-
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector()));
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-
-        props.putAll(workerConfig.originalsWithPrefix("consumer."));
-
-        KafkaConsumer<byte[], byte[]> newConsumer;
-        try {
-            newConsumer = new KafkaConsumer<>(props);
-        } catch (Throwable t) {
-            throw new ConnectException("Failed to create consumer", t);
-        }
-
-        return newConsumer;
-    }
-
     private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
         origOffsets.clear();
         for (ConsumerRecord<byte[], byte[]> msg : msgs) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 6d92c34adef..5d223f4713e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -352,7 +352,6 @@ private void assertErrorHandlingMetricValue(String name, double expected) {
     }
 
     private void expectInitializeTask() throws Exception {
-        PowerMock.expectPrivate(workerSinkTask, "createConsumer").andReturn(consumer);
         consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
 
@@ -371,11 +370,10 @@ private void createSinkTask(TargetState initialState, RetryWithToleranceOperator
 
         TransformationChain<SinkRecord> sinkTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator);
 
-        workerSinkTask = PowerMock.createPartialMock(
-                WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig,
-                ClusterConfigState.EMPTY, metrics, converter, converter,
-                headerConverter, sinkTransforms, pluginLoader, time, retryWithToleranceOperator);
+        workerSinkTask = new WorkerSinkTask(
+            taskId, sinkTask, statusListener, initialState, workerConfig,
+            ClusterConfigState.EMPTY, metrics, converter, converter,
+            headerConverter, sinkTransforms, consumer, pluginLoader, time, retryWithToleranceOperator);
     }
 
     private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 7223c3bce22..3e047ff9ed5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -162,12 +162,11 @@ public void setUp() {
     }
 
     private void createTask(TargetState initialState) {
-        workerTask = PowerMock.createPartialMock(
-                WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics,
-                keyConverter, valueConverter, headerConverter,
-                transformationChain, pluginLoader, time,
-                RetryWithToleranceOperatorTest.NOOP_OPERATOR);
+        workerTask = new WorkerSinkTask(
+            taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics,
+            keyConverter, valueConverter, headerConverter,
+            transformationChain, consumer, pluginLoader, time,
+            RetryWithToleranceOperatorTest.NOOP_OPERATOR);
     }
 
     @After
@@ -1167,7 +1166,6 @@ public void testTopicsRegex() throws Exception {
 
         createTask(TargetState.PAUSED);
 
-        PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
         consumer.subscribe(EasyMock.capture(topicsRegex), EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
 
@@ -1255,7 +1253,6 @@ public void testMetricsGroup() {
     }
 
     private void expectInitializeTask() throws Exception {
-        PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
         consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index d49c1cd99ff..6e2b01ce7bc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -137,12 +137,11 @@ public void setup() {
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         workerConfig = new StandaloneConfig(workerProps);
-        workerTask = PowerMock.createPartialMock(
-                WorkerSinkTask.class, new String[]{"createConsumer"},
+        workerTask = new WorkerSinkTask(
                 taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter,
                 valueConverter, headerConverter,
                 new TransformationChain<>(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR),
-                pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
+                consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
 
         recordsReturned = 0;
     }
@@ -509,7 +508,6 @@ public Object answer() throws Throwable {
     }
 
     private void expectInitializeTask() throws Exception {
-        PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
 
         consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f3cacc469da..8f15c87c501 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
@@ -87,9 +89,13 @@
     private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
     private static final String WORKER_ID = "localhost:8083";
 
+    private Map<String, String> workerProps = new HashMap<>();
     private WorkerConfig config;
     private Worker worker;
 
+    private Map<String, String> defaultProducerConfigs = new HashMap<>();
+    private Map<String, String> defaultConsumerConfigs = new HashMap<>();
+
     @Mock
     private Plugins plugins;
     @Mock
@@ -116,8 +122,6 @@
     @Before
     public void setup() {
         super.setup();
-
-        Map<String, String> workerProps = new HashMap<>();
         workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
@@ -128,6 +132,25 @@ public void setup() {
         workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
         config = new StandaloneConfig(workerProps);
 
+        defaultProducerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        defaultProducerConfigs.put(
+            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        defaultProducerConfigs.put(
+            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        defaultProducerConfigs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
+        defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
+        defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+        defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
+        defaultProducerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
+
+        defaultConsumerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        defaultConsumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        defaultConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        defaultConsumerConfigs
+            .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        defaultConsumerConfigs
+            .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
         PowerMock.mockStatic(Plugins.class);
     }
 
@@ -804,6 +827,45 @@ public void testConverterOverrides() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testProducerConfigsWithoutOverrides() {
+        assertEquals(defaultProducerConfigs, Worker.producerConfigs(config));
+    }
+
+    @Test
+    public void testProducerConfigsWithOverrides() {
+        Map<String, String> props = new HashMap<>(workerProps);
+        props.put("producer.acks", "-1");
+        props.put("producer.linger.ms", "1000");
+        WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+        Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs);
+        expectedConfigs.put("acks", "-1");
+        expectedConfigs.put("linger.ms", "1000");
+        assertEquals(expectedConfigs, Worker.producerConfigs(configWithOverrides));
+    }
+
+    @Test
+    public void testConsumerConfigsWithoutOverrides() {
+        Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
+        expectedConfigs.put("group.id", "connect-test");
+        assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config));
+    }
+
+    @Test
+    public void testConsumerConfigsWithOverrides() {
+        Map<String, String> props = new HashMap<>(workerProps);
+        props.put("consumer.auto.offset.reset", "latest");
+        props.put("consumer.max.poll.records", "1000");
+        WorkerConfig configWithOverrides = new StandaloneConfig(props);
+
+        Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
+        expectedConfigs.put("group.id", "connect-test");
+        expectedConfigs.put("auto.offset.reset", "latest");
+        expectedConfigs.put("max.poll.records", "1000");
+        assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides));
+    }
+
     private void assertStatistics(Worker worker, int connectors, int tasks) {
         MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup();
         assertEquals(connectors, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-count"), 0.0001d);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Refactor to create both producer & consumer in Worker
> -----------------------------------------------------
>
>                 Key: KAFKA-7551
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7551
>             Project: Kafka
>          Issue Type: Task
>          Components: KafkaConnect
>            Reporter: Magesh kumar Nandakumar
>            Assignee: Magesh kumar Nandakumar
>            Priority: Minor
>             Fix For: 2.2.0
>
>
> In distributed mode,  the producer is created in the Worker and the consumer is created in the WorkerSinkTask. The proposal is to refactor it so that both of them are created in Worker. This will not affect any functionality and is just a refactoring to make the code consistent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)