You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/07/16 09:11:37 UTC

[kafka] branch 2.4 updated: KAFKA-10160: Kafka MM2 consumer configuration (#8921)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 2eb10ec  KAFKA-10160: Kafka MM2 consumer configuration (#8921)
2eb10ec is described below

commit 2eb10ecac214ccb28ca1b29a368d836e435f1637
Author: Satish Bellapu <10...@users.noreply.github.com>
AuthorDate: Thu Jul 16 02:10:45 2020 -0700

    KAFKA-10160: Kafka MM2 consumer configuration (#8921)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Ryanne Dolan <ry...@gmail.com>, Adam Keyser <ad...@gmail.com>
---
 .../apache/kafka/connect/mirror/MirrorConnectorConfig.java |  5 +++--
 .../kafka/connect/mirror/MirrorConnectorConfigTest.java    | 14 ++++++++++++++
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index d922ead..eb38cfd 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import java.util.Map;
 import java.util.HashMap;
@@ -229,8 +230,8 @@ public class MirrorConnectorConfig extends AbstractConfig {
         props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
         props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
         props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
-        props.put("enable.auto.commit", "false");
-        props.put("auto.offset.reset", "earliest");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         return props;
     }
 
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index 8e99779..dfd4146 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -110,6 +110,20 @@ public class MirrorConnectorConfigTest {
     }
 
     @Test
+    public void testConsumerRestOffsetLatestConfs() {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("consumer.auto.offset.reset", "latest"));
+        Map<String, Object> sourceConf = config.sourceConsumerConfig();
+        assertEquals("latest", sourceConf.get("auto.offset.reset"));
+    }
+
+    @Test
+    public void testConsumerDefaultConfs() {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps());
+        Map<String, Object> sourceConf = config.sourceConsumerConfig();
+        assertEquals("earliest", sourceConf.get("auto.offset.reset"));
+    }
+
+    @Test
     public void testNonMutationOfConfigDef() {
         Collection<String> taskSpecificProperties = Arrays.asList(
             MirrorConnectorConfig.TASK_TOPIC_PARTITIONS,