You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/07/16 09:11:37 UTC
[kafka] branch 2.4 updated: KAFKA-10160: Kafka MM2 consumer
configuration (#8921)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 2eb10ec KAFKA-10160: Kafka MM2 consumer configuration (#8921)
2eb10ec is described below
commit 2eb10ecac214ccb28ca1b29a368d836e435f1637
Author: Satish Bellapu <10...@users.noreply.github.com>
AuthorDate: Thu Jul 16 02:10:45 2020 -0700
KAFKA-10160: Kafka MM2 consumer configuration (#8921)
Reviewers: Mickael Maison <mi...@gmail.com>, Ryanne Dolan <ry...@gmail.com>, Adam Keyser <ad...@gmail.com>
---
.../apache/kafka/connect/mirror/MirrorConnectorConfig.java | 5 +++--
.../kafka/connect/mirror/MirrorConnectorConfigTest.java | 14 ++++++++++++++
2 files changed, 17 insertions(+), 2 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index d922ead..eb38cfd 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Map;
import java.util.HashMap;
@@ -229,8 +230,8 @@ public class MirrorConnectorConfig extends AbstractConfig {
props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
- props.put("enable.auto.commit", "false");
- props.put("auto.offset.reset", "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index 8e99779..dfd4146 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -110,6 +110,20 @@ public class MirrorConnectorConfigTest {
}
@Test
+ public void testConsumerRestOffsetLatestConfs() {
+ MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("consumer.auto.offset.reset", "latest"));
+ Map<String, Object> sourceConf = config.sourceConsumerConfig();
+ assertEquals("latest", sourceConf.get("auto.offset.reset"));
+ }
+
+ @Test
+ public void testConsumerDefaultConfs() {
+ MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps());
+ Map<String, Object> sourceConf = config.sourceConsumerConfig();
+ assertEquals("earliest", sourceConf.get("auto.offset.reset"));
+ }
+
+ @Test
public void testNonMutationOfConfigDef() {
Collection<String> taskSpecificProperties = Arrays.asList(
MirrorConnectorConfig.TASK_TOPIC_PARTITIONS,