You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/09/21 06:17:27 UTC

[druid] branch master updated: Add KafkaConfigOverrides extension point (#13122)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 331e6d707b Add KafkaConfigOverrides extension point (#13122)
331e6d707b is described below

commit 331e6d707b003391d44ee583dbc328b2b6359425
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Wed Sep 21 01:17:19 2022 -0500

    Add KafkaConfigOverrides extension point (#13122)
    
    * Add KafkaConfigOverrides extension point
    
    * X
---
 .../druid/indexing/kafka/KafkaIndexTask.java       |   6 +-
 .../indexing/kafka/KafkaIndexTaskIOConfig.java     |  23 +++-
 .../druid/indexing/kafka/KafkaRecordSupplier.java  |  25 +++-
 .../druid/indexing/kafka/KafkaSamplerSpec.java     |   2 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |   9 +-
 .../kafka/supervisor/KafkaSupervisorIOConfig.java  |  14 ++-
 .../druid/indexing/kafka/KafkaIOConfigTest.java    |   1 +
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 136 ++++++++++++++-------
 .../indexing/kafka/KafkaRecordSupplierTest.java    |  61 ++++++---
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java |   2 +
 .../supervisor/KafkaSupervisorIOConfigTest.java    |   1 +
 .../kafka/supervisor/KafkaSupervisorTest.java      |  10 +-
 .../apache/druid/indexing/common/task/Tasks.java   |   2 +
 .../extension/KafkaConfigOverrides.java            |  45 +++++++
 14 files changed, 257 insertions(+), 80 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index e24072e5a9..700d2d12d9 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -94,12 +94,12 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, Kafka
     ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
     try {
       Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-
-      final Map<String, Object> props = new HashMap<>(((KafkaIndexTaskIOConfig) super.ioConfig).getConsumerProperties());
+      KafkaIndexTaskIOConfig kafkaIndexTaskIOConfig = (KafkaIndexTaskIOConfig) super.ioConfig;
+      final Map<String, Object> props = new HashMap<>(kafkaIndexTaskIOConfig.getConsumerProperties());
 
       props.put("auto.offset.reset", "none");
 
-      return new KafkaRecordSupplier(props, configMapper);
+      return new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides());
     }
     finally {
       Thread.currentThread().setContextClassLoader(currCtxCl);
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
index ece3fa5ba8..1f4808ad68 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
@@ -27,6 +27,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -36,6 +37,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
 {
   private final Map<String, Object> consumerProperties;
   private final long pollTimeout;
+  private final KafkaConfigOverrides configOverrides;
 
   @JsonCreator
   public KafkaIndexTaskIOConfig(
@@ -56,7 +58,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
       @JsonProperty("useTransaction") Boolean useTransaction,
       @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
       @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
-      @JsonProperty("inputFormat") @Nullable InputFormat inputFormat
+      @JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
+      @JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides
   )
   {
     super(
@@ -74,13 +77,14 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
 
     this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
     this.pollTimeout = pollTimeout != null ? pollTimeout : KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
+    this.configOverrides = configOverrides;
 
     final SeekableStreamEndSequenceNumbers<Integer, Long> myEndSequenceNumbers = getEndSequenceNumbers();
     for (int partition : myEndSequenceNumbers.getPartitionSequenceNumberMap().keySet()) {
       Preconditions.checkArgument(
           myEndSequenceNumbers.getPartitionSequenceNumberMap()
-                       .get(partition)
-                       .compareTo(getStartSequenceNumbers().getPartitionSequenceNumberMap().get(partition)) >= 0,
+                              .get(partition)
+                              .compareTo(getStartSequenceNumbers().getPartitionSequenceNumberMap().get(partition)) >= 0,
           "end offset must be >= start offset for partition[%s]",
           partition
       );
@@ -97,7 +101,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
       Boolean useTransaction,
       DateTime minimumMessageTime,
       DateTime maximumMessageTime,
-      InputFormat inputFormat
+      InputFormat inputFormat,
+      KafkaConfigOverrides configOverrides
   )
   {
     this(
@@ -112,7 +117,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
         useTransaction,
         minimumMessageTime,
         maximumMessageTime,
-        inputFormat
+        inputFormat,
+        configOverrides
     );
   }
 
@@ -156,6 +162,12 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
     return pollTimeout;
   }
 
+  @JsonProperty
+  public KafkaConfigOverrides getConfigOverrides()
+  {
+    return configOverrides;
+  }
+
   @Override
   public String toString()
   {
@@ -169,6 +181,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
            ", useTransaction=" + isUseTransaction() +
            ", minimumMessageTime=" + getMinimumMessageTime() +
            ", maximumMessageTime=" + getMaximumMessageTime() +
+           ", configOverrides=" + getConfigOverrides() +
            '}';
   }
 }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index f6d37ff6a9..255250d924 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.DynamicConfigProvider;
@@ -63,10 +64,11 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
 
   public KafkaRecordSupplier(
       Map<String, Object> consumerProperties,
-      ObjectMapper sortingMapper
+      ObjectMapper sortingMapper,
+      KafkaConfigOverrides configOverrides
   )
   {
-    this(getKafkaConsumer(sortingMapper, consumerProperties));
+    this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides));
   }
 
   @VisibleForTesting
@@ -228,7 +230,6 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
     if (dynamicConfigProviderJson != null) {
       DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
       Map<String, String> dynamicConfig = dynamicConfigProvider.getConfig();
-
       for (Map.Entry<String, String> e : dynamicConfig.entrySet()) {
         properties.setProperty(e.getKey(), e.getValue());
       }
@@ -268,11 +269,25 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
     return deserializerObject;
   }
 
-  private static KafkaConsumer<byte[], byte[]> getKafkaConsumer(ObjectMapper sortingMapper, Map<String, Object> consumerProperties)
+  public static KafkaConsumer<byte[], byte[]> getKafkaConsumer(
+      ObjectMapper sortingMapper,
+      Map<String, Object> consumerProperties,
+      KafkaConfigOverrides configOverrides
+  )
   {
     final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
     final Properties props = new Properties();
-    addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
+    Map<String, Object> effectiveConsumerProperties;
+    if (configOverrides != null) {
+      effectiveConsumerProperties = configOverrides.overrideConfigs(consumerProperties);
+    } else {
+      effectiveConsumerProperties = consumerProperties;
+    }
+    addConsumerPropertiesFromConfig(
+        props,
+        sortingMapper,
+        effectiveConsumerProperties
+    );
     props.putIfAbsent("isolation.level", "read_committed");
     props.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
     props.putAll(consumerConfigs);
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
index 3981728cfc..d2c6a105de 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
@@ -63,7 +63,7 @@ public class KafkaSamplerSpec extends SeekableStreamSamplerSpec
       props.put("auto.offset.reset", "none");
       props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs()));
 
-      return new KafkaRecordSupplier(props, objectMapper);
+      return new KafkaRecordSupplier(props, objectMapper, ((KafkaSupervisorIOConfig) ioConfig).getConfigOverrides());
     }
     finally {
       Thread.currentThread().setContextClassLoader(currCtxCl);
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index a02568a8ee..9bac92429a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -126,7 +126,11 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
   @Override
   protected RecordSupplier<Integer, Long, KafkaRecordEntity> setupRecordSupplier()
   {
-    return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper);
+    return new KafkaRecordSupplier(
+        spec.getIoConfig().getConsumerProperties(),
+        sortingMapper,
+        spec.getIoConfig().getConfigOverrides()
+    );
   }
 
   @Override
@@ -197,7 +201,8 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
         true,
         minimumMessageTime,
         maximumMessageTime,
-        ioConfig.getInputFormat()
+        ioConfig.getInputFormat(),
+        kafkaIoConfig.getConfigOverrides()
     );
   }
 
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 0625ead097..2d6a800942 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.StringUtils;
@@ -43,7 +44,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
 
   private final Map<String, Object> consumerProperties;
   private final long pollTimeout;
-
+  private final KafkaConfigOverrides configOverrides;
 
   @JsonCreator
   public KafkaSupervisorIOConfig(
@@ -61,7 +62,8 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
       @JsonProperty("completionTimeout") Period completionTimeout,
       @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
       @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
-      @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime
+      @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
+      @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides
   )
   {
     super(
@@ -86,6 +88,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
         StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)
     );
     this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
+    this.configOverrides = configOverrides;
   }
 
   @JsonProperty
@@ -112,6 +115,12 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
     return isUseEarliestSequenceNumber();
   }
 
+  @JsonProperty
+  public KafkaConfigOverrides getConfigOverrides()
+  {
+    return configOverrides;
+  }
+
   @Override
   public String toString()
   {
@@ -130,6 +139,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
            ", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() +
            ", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() +
            ", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() +
+           ", configOverrides=" + getConfigOverrides() +
            '}';
   }
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
index 4de3c06e78..60d071688e 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -317,6 +317,7 @@ public class KafkaIOConfigTest
         true,
         DateTimes.nowUtc(),
         DateTimes.nowUtc(),
+        null,
         null
     );
     final byte[] json = mapper.writeValueAsBytes(currentConfig);
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 6e7bee5b24..b1dc4545aa 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -404,7 +404,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     Assert.assertTrue(task.supportsQueries());
@@ -463,7 +464,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     final ListenableFuture<TaskStatus> future = runTask(task);
@@ -509,7 +511,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
@@ -543,6 +546,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
+            null,
             null
         )
     );
@@ -587,7 +591,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -641,7 +646,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     final ListenableFuture<TaskStatus> future = runTask(task);
@@ -719,7 +725,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     final ListenableFuture<TaskStatus> future = runTask(task);
@@ -823,7 +830,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     final ListenableFuture<TaskStatus> future = runTask(task);
@@ -950,7 +958,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     final ListenableFuture<TaskStatus> future = runTask(task);
@@ -1036,7 +1045,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     final KafkaIndexTask staleReplica = createTask(
@@ -1051,7 +1061,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1119,7 +1130,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             DateTimes.of("2010"),
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1170,7 +1182,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             DateTimes.of("2010"),
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1230,7 +1243,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1305,7 +1319,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            new TestKafkaInputFormat(INPUT_FORMAT)
+            new TestKafkaInputFormat(INPUT_FORMAT),
+            null
         )
     );
     Assert.assertTrue(task.supportsQueries());
@@ -1381,7 +1396,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            KAFKA_INPUT_FORMAT
+            KAFKA_INPUT_FORMAT,
+            null
         )
     );
     Assert.assertTrue(task.supportsQueries());
@@ -1436,7 +1452,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1474,7 +1491,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1523,7 +1541,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1577,7 +1596,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1618,7 +1638,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1712,7 +1733,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1786,7 +1808,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     final KafkaIndexTask task2 = createTask(
@@ -1801,7 +1824,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1852,7 +1876,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     final KafkaIndexTask task2 = createTask(
@@ -1867,7 +1892,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1920,7 +1946,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             false,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     final KafkaIndexTask task2 = createTask(
@@ -1935,7 +1962,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             false,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -1986,7 +2014,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2034,7 +2063,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     final KafkaIndexTask task2 = createTask(
@@ -2049,7 +2079,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2103,7 +2134,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2144,7 +2176,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2206,7 +2239,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2255,7 +2289,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2317,7 +2352,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2407,7 +2443,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2443,7 +2480,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2489,7 +2527,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         ),
         context
     );
@@ -2536,7 +2575,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2574,7 +2614,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2685,7 +2726,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
                     true,
                     null,
                     null,
-                    INPUT_FORMAT
+                    INPUT_FORMAT,
+                    null
             )
     );
 
@@ -2744,7 +2786,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
     final ListenableFuture<TaskStatus> future = runTask(task);
@@ -2766,7 +2809,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -2815,7 +2859,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
@@ -3207,7 +3252,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
             true,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         )
     );
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index 897a586985..b480ec146f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -34,8 +34,11 @@ import org.apache.druid.metadata.DynamicConfigProvider;
 import org.apache.druid.metadata.MapStringDynamicConfigProvider;
 import org.apache.druid.segment.TestHelper;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -227,7 +230,7 @@ public class KafkaRecordSupplierTest
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER);
+        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
 
     Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
 
@@ -257,7 +260,8 @@ public class KafkaRecordSupplierTest
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
         properties,
-        OBJECT_MAPPER
+        OBJECT_MAPPER,
+        null
     );
 
     Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
@@ -282,7 +286,8 @@ public class KafkaRecordSupplierTest
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
             properties,
-            OBJECT_MAPPER
+            OBJECT_MAPPER,
+            null
     );
 
     Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
@@ -299,7 +304,8 @@ public class KafkaRecordSupplierTest
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
             properties,
-            OBJECT_MAPPER
+            OBJECT_MAPPER,
+            null
     );
 
     Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
@@ -324,7 +330,8 @@ public class KafkaRecordSupplierTest
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
         properties,
-        OBJECT_MAPPER
+        OBJECT_MAPPER,
+        null
     );
 
     recordSupplier.assign(partitions);
@@ -358,7 +365,10 @@ public class KafkaRecordSupplierTest
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER);
+        kafkaServer.consumerProperties(),
+        OBJECT_MAPPER,
+        null
+    );
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -399,7 +409,7 @@ public class KafkaRecordSupplierTest
 
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER);
+        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -470,7 +480,7 @@ public class KafkaRecordSupplierTest
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER);
+        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -513,7 +523,7 @@ public class KafkaRecordSupplierTest
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER);
+        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -546,7 +556,7 @@ public class KafkaRecordSupplierTest
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER);
+        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
 
     recordSupplier.assign(partitions);
 
@@ -572,7 +582,7 @@ public class KafkaRecordSupplierTest
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER);
+        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -607,7 +617,7 @@ public class KafkaRecordSupplierTest
   public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER);
+        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
     StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
@@ -619,7 +629,7 @@ public class KafkaRecordSupplierTest
   public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER);
+        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
     StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
@@ -631,7 +641,7 @@ public class KafkaRecordSupplierTest
   public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER);
+        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
     StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
@@ -643,7 +653,7 @@ public class KafkaRecordSupplierTest
   public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER);
+        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
     StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
@@ -678,6 +688,27 @@ public class KafkaRecordSupplierTest
     Assert.assertEquals("pwd2", properties.getProperty(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY));
   }
 
+  @Test
+  public void testUseKafkaConsumerOverrides()
+  {
+    KafkaConsumer<byte[], byte[]> kafkaConsumer = KafkaRecordSupplier.getKafkaConsumer(
+        OBJECT_MAPPER,
+        kafkaServer.consumerProperties(),
+        originalConsumerProperties -> {
+          final Map<String, Object> newMap = new HashMap<>(originalConsumerProperties);
+          newMap.put("client.id", "overrideConfigTest");
+          return newMap;
+        }
+    );
+
+    // We set a client ID via config override, it should appear in the metric name tags
+    Map<MetricName, KafkaMetric> metrics = (Map<MetricName, KafkaMetric>) kafkaConsumer.metrics();
+    for (MetricName metricName : metrics.keySet()) {
+      Assert.assertEquals("overrideConfigTest", metricName.tags().get("client-id"));
+      break;
+    }
+  }
+
   private void insertData() throws ExecutionException, InterruptedException
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index bf5831ccae..24aa2a7dd4 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -143,6 +143,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
             null,
             null,
             null,
+            null,
             null
         ),
         null,
@@ -318,6 +319,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
             null,
             null,
             null,
+            null,
             null
         ),
         null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index e503d4fd7c..5b231deb31 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -304,6 +304,7 @@ public class KafkaSupervisorIOConfigTest
         new Period("PT30M"),
         null,
         null,
+        null,
         null
     );
     String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index e6a3a919b4..0e01702240 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -297,6 +297,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
             new Period("PT30M"),
             null,
             null,
+            null,
             null
     );
 
@@ -447,7 +448,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
             null,
             null,
             null,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         ),
         new KafkaIndexTaskTuningConfig(
             null,
@@ -3649,6 +3651,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         new Period("PT30M"),
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
+        null,
         null
     );
 
@@ -3761,6 +3764,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         new Period("PT30M"),
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
+        null,
         null
     );
 
@@ -3877,6 +3881,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         new Period("PT30M"),
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
+        null,
         null
     );
 
@@ -4020,7 +4025,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
             true,
             minimumMessageTime,
             maximumMessageTime,
-            INPUT_FORMAT
+            INPUT_FORMAT,
+            null
         ),
         Collections.emptyMap(),
         OBJECT_MAPPER
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index 854810f922..e009394226 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -53,6 +53,8 @@ public class Tasks
   public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
   public static final String USE_SHARED_LOCK = "useSharedLock";
   public static final String STORE_EMPTY_COLUMNS_KEY = "storeEmptyColumns";
+  public static final String DYNAMIC_CONFIG_PROVIDER_KEY = "dynamicConfigProviderKey";
+
 
   /**
    * Context flag denoting if maximum possible values should be used to estimate
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/extension/KafkaConfigOverrides.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/extension/KafkaConfigOverrides.java
new file mode 100644
index 0000000000..58a2275133
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/extension/KafkaConfigOverrides.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.extension;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.guice.annotations.ExtensionPoint;
+
+import java.util.Map;
+
+/**
+ * This is used to allow extensions to make adjustments to the Kafka consumer properties.
+ *
+ * This interface is only used by the druid-kafka-indexing-service extension, but the interface definition must
+ * be placed in a non-extension module in order for other extensions to be able to provide subtypes visible within
+ * the druid-kafka-indexing-service extension.
+ */
+@ExtensionPoint
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+public interface KafkaConfigOverrides
+{
+  /**
+   * Given a map of Kafka consumer properties, return a new potentially adjusted map of properties.
+   *
+   * @param originalConsumerProperties Kafka consumer properties
+   * @return Adjusted copy of Kafka consumer properties
+   */
+  Map<String, Object> overrideConfigs(Map<String, Object> originalConsumerProperties);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org