You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/02/18 19:50:16 UTC

[incubator-druid] branch master updated: Support kafka transactional topics (#5404) (#6496)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 80a2ef7  Support kafka transactional topics (#5404) (#6496)
80a2ef7 is described below

commit 80a2ef7be46c4fc807ac2a74507b8ba8b6a44049
Author: Surekha <su...@imply.io>
AuthorDate: Mon Feb 18 11:50:08 2019 -0800

    Support kafka transactional topics (#5404) (#6496)
    
    * Support kafka transactional topics
    
    * update kafka to version 2.0.0
    * Remove the skipOffsetGaps option since it's not used anymore
    * Adjust kafka consumer to use transactional semantics
    * Update tests
    
    * Remove unused import from test
    
    * Fix compilation
    
    * Invoke transaction api to fix a unit test
    
    * temporary modification of travis.yml for debugging
    
    * another attempt to get travis tasklogs
    
    * update kafka to 2.0.1 at all places
    
    * Remove druid-kafka-eight dependency from integration-tests, remove the kafka firehose test and deprecate kafka-eight classes
    
    * Add deprecated in docs for kafka-eight and kafka-simple extensions
    
    * Remove skipOffsetGaps and code changes for transaction support
    
    * Fix indentation
    
    * remove skipOffsetGaps from kinesis
    
    * Add transaction api to KafkaRecordSupplierTest
    
    * Fix indent
    
    * Fix test
    
    * update kafka version to 2.1.0
---
 .../development/extensions-core/kafka-ingestion.md |   1 -
 docs/content/development/extensions.md             |   4 +-
 .../kafka/KafkaEightSimpleConsumerDruidModule.java |   1 +
 .../KafkaEightSimpleConsumerFirehoseFactory.java   |   1 +
 .../druid/firehose/kafka/KafkaSimpleConsumer.java  |   1 +
 .../firehose/kafka/KafkaEightDruidModule.java      |   1 +
 .../firehose/kafka/KafkaEightFirehoseFactory.java  |   2 +
 extensions-core/kafka-indexing-service/pom.xml     |   2 +-
 .../druid/indexing/kafka/KafkaIndexTask.java       |   1 +
 .../indexing/kafka/KafkaIndexTaskIOConfig.java     |   5 +-
 .../druid/indexing/kafka/KafkaRecordSupplier.java  |   3 +-
 .../indexing/kafka/LegacyKafkaIndexTaskRunner.java |  19 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |   3 +-
 .../kafka/supervisor/KafkaSupervisorIOConfig.java  |  13 +-
 .../druid/indexing/kafka/KafkaIOConfigTest.java    |   6 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 676 +++++++++++----------
 .../indexing/kafka/KafkaRecordSupplierTest.java    |  85 ++-
 .../supervisor/KafkaSupervisorIOConfigTest.java    |   5 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      | 105 ++--
 .../druid/indexing/kafka/test/TestBroker.java      |  11 +-
 .../indexing/kinesis/KinesisIndexTaskIOConfig.java |   1 -
 .../indexing/kinesis/KinesisIOConfigTest.java      |   2 -
 .../seekablestream/SeekableStreamIndexTask.java    |   2 +-
 .../SeekableStreamIndexTaskIOConfig.java           |   8 -
 .../SeekableStreamIndexTaskRunner.java             |  20 +-
 .../common/OrderedPartitionableRecord.java         |   2 +-
 .../seekablestream/common/RecordSupplier.java      |   2 +-
 .../supervisor/SeekableStreamSupervisor.java       |   2 +-
 integration-tests/pom.xml                          |  13 +-
 .../apache/druid/tests/indexer/ITKafkaTest.java    | 320 ----------
 .../test/resources/indexer/kafka_index_task.json   |  68 ---
 pom.xml                                            |   2 +-
 32 files changed, 511 insertions(+), 876 deletions(-)

diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md
index b24e874..85cebc5 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -201,7 +201,6 @@ For Roaring bitmaps:
 |`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime an [...]
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageReject [...]
-|`skipOffsetGaps`|Boolean|Whether or not to allow gaps of missing offsets in the Kafka stream. This is required for compatibility with implementations such as MapR Streams which does not guarantee consecutive offsets. If this is false, an exception will be thrown if offsets are not consecutive.|no (default == false)|
 
 ## Operations
 
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index a5b8a5b..a6d3a7b 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -48,7 +48,7 @@ Core extensions are maintained by Druid committers.
 |druid-datasketches|Support for approximate counts and set operations with [DataSketches](http://datasketches.github.io/).|[link](../development/extensions-core/datasketches-extension.html)|
 |druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)|
 |druid-histogram|Approximate histograms and quantiles aggregator.|[link](../development/extensions-core/approximate-histograms.html)|
-|druid-kafka-eight|Kafka ingest firehose (high level consumer) for realtime nodes.|[link](../development/extensions-core/kafka-eight-firehose.html)|
+|druid-kafka-eight|Kafka ingest firehose (high level consumer) for realtime nodes(deprecated).|[link](../development/extensions-core/kafka-eight-firehose.html)|
 |druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)|
 |druid-kafka-indexing-service|Supervised exactly-once Kafka ingestion for the indexing service.|[link](../development/extensions-core/kafka-ingestion.html)|
 |druid-kinesis-indexing-service|Supervised exactly-once Kinesis ingestion for the indexing service.|[link](../development/extensions-core/kinesis-ingestion.html)|
@@ -81,7 +81,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
 |druid-cassandra-storage|Apache Cassandra deep storage.|[link](../development/extensions-contrib/cassandra.html)|
 |druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
 |druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
-|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)|
+|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer)(deprecated).|[link](../development/extensions-contrib/kafka-simple.html)|
 |druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-contrib/orc.html)|
 |druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
 |druid-redis-cache|A cache implementation for Druid based on Redis.|[link](../development/extensions-contrib/redis-cache.html)|
diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java
index a03f320..8fad815 100644
--- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java
+++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java
@@ -28,6 +28,7 @@ import org.apache.druid.initialization.DruidModule;
 
 import java.util.List;
 
+@Deprecated
 public class KafkaEightSimpleConsumerDruidModule implements DruidModule
 {
   @Override
diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
index 65c501a..ca34e55 100644
--- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
+++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
@@ -46,6 +46,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+@Deprecated
 public class KafkaEightSimpleConsumerFirehoseFactory implements
     FirehoseFactoryV2<ByteBufferInputRowParser>
 {
diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java
index 038fb2d..25fc8de 100644
--- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java
+++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java
@@ -56,6 +56,7 @@ import java.util.concurrent.TimeUnit;
  * This class is not thread safe, the caller must ensure all the methods be
  * called from single thread
  */
+@Deprecated
 public class KafkaSimpleConsumer
 {
 
diff --git a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java
index 4fe379a..f8a5ad7 100644
--- a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java
+++ b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java
@@ -30,6 +30,7 @@ import java.util.List;
 
 /**
  */
+@Deprecated
 public class KafkaEightDruidModule implements DruidModule
 {
   @Override
diff --git a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java
index 0d0ed58..bb38c05 100644
--- a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java
+++ b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java
@@ -47,7 +47,9 @@ import java.util.Properties;
 import java.util.Set;
 
 /**
+ * This class is deprecated and kafka-eight module should be removed completely
  */
+@Deprecated
 public class KafkaEightFirehoseFactory implements FirehoseFactory<InputRowParser<ByteBuffer>>
 {
   private static final Logger log = new Logger(KafkaEightFirehoseFactory.class);
diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml
index c48091f..b7c7816 100644
--- a/extensions-core/kafka-indexing-service/pom.xml
+++ b/extensions-core/kafka-indexing-service/pom.xml
@@ -34,7 +34,7 @@
   </parent>
 
   <properties>
-    <apache.kafka.version>0.10.2.2</apache.kafka.version>
+    <apache.kafka.version>2.1.0</apache.kafka.version>
   </properties>
 
   <dependencies>
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 950441c..4fcbbe1 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -108,6 +108,7 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
       props.setProperty("auto.offset.reset", "none");
       props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
       props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
+      props.setProperty("isolation.level", "read_committed");
 
       return new KafkaConsumer<>(props);
     }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
index af84bfc..5f36816 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
@@ -45,8 +45,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
       @JsonProperty("pollTimeout") Long pollTimeout,
       @JsonProperty("useTransaction") Boolean useTransaction,
       @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
-      @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
-      @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
+      @JsonProperty("maximumMessageTime") DateTime maximumMessageTime
   )
   {
     super(
@@ -57,7 +56,6 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
         useTransaction,
         minimumMessageTime,
         maximumMessageTime,
-        skipOffsetGaps,
         null
     );
 
@@ -100,7 +98,6 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
            ", useTransaction=" + isUseTransaction() +
            ", minimumMessageTime=" + getMinimumMessageTime() +
            ", maximumMessageTime=" + getMaximumMessageTime() +
-           ", skipOffsetGaps=" + isSkipOffsetGaps() +
            '}';
   }
 }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index 935404c..6c3d053 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 
 import javax.annotation.Nonnull;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -110,7 +111,7 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
   public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout)
   {
     List<OrderedPartitionableRecord<Integer, Long>> polledRecords = new ArrayList<>();
-    for (ConsumerRecord<byte[], byte[]> record : consumer.poll(timeout)) {
+    for (ConsumerRecord<byte[], byte[]> record : consumer.poll(Duration.ofMillis(timeout))) {
       polledRecords.add(new OrderedPartitionableRecord<>(
           record.topic(),
           record.partition(),
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index d081a0e..c82700d 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -413,23 +413,6 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
             }
 
             if (record.offset() < endOffsets.get(record.partition())) {
-              if (record.offset() != nextOffsets.get(record.partition())) {
-                if (ioConfig.isSkipOffsetGaps()) {
-                  log.warn(
-                      "Skipped to offset[%,d] after offset[%,d] in partition[%d].",
-                      record.offset(),
-                      nextOffsets.get(record.partition()),
-                      record.partition()
-                  );
-                } else {
-                  throw new ISE(
-                      "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].",
-                      record.offset(),
-                      nextOffsets.get(record.partition()),
-                      record.partition()
-                  );
-                }
-              }
 
               try {
                 final byte[] valueBytes = record.value();
@@ -489,7 +472,7 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
               nextOffsets.put(record.partition(), record.offset() + 1);
             }
 
-            if (nextOffsets.get(record.partition()).equals(endOffsets.get(record.partition()))
+            if (nextOffsets.get(record.partition()) >= (endOffsets.get(record.partition()))
                 && assignment.remove(record.partition())) {
               log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
               KafkaIndexTask.assignPartitions(consumer, topic, assignment);
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 0e9dcac..bb388cd 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -212,8 +212,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
         kafkaIoConfig.getPollTimeout(),
         true,
         minimumMessageTime,
-        maximumMessageTime,
-        kafkaIoConfig.isSkipOffsetGaps()
+        maximumMessageTime
     );
   }
 
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index ddd0f06..629daa7 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -38,7 +38,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
 
   private final Map<String, Object> consumerProperties;
   private final long pollTimeout;
-  private final boolean skipOffsetGaps;
+
 
   @JsonCreator
   public KafkaSupervisorIOConfig(
@@ -53,8 +53,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
       @JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
       @JsonProperty("completionTimeout") Period completionTimeout,
       @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
-      @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
-      @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
+      @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod
   )
   {
     super(
@@ -76,7 +75,6 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
         StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)
     );
     this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
-    this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false;
   }
 
   @JsonProperty
@@ -103,12 +101,6 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
     return isUseEarliestSequenceNumber();
   }
 
-  @JsonProperty
-  public boolean isSkipOffsetGaps()
-  {
-    return skipOffsetGaps;
-  }
-
   @Override
   public String toString()
   {
@@ -125,7 +117,6 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
            ", completionTimeout=" + getCompletionTimeout() +
            ", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() +
            ", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() +
-           ", skipOffsetGaps=" + skipOffsetGaps +
            '}';
   }
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
index 7ce8df0..556cba1 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -77,7 +77,6 @@ public class KafkaIOConfigTest
     Assert.assertTrue(config.isUseTransaction());
     Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent());
     Assert.assertFalse("maximumMessageTime", config.getMaximumMessageTime().isPresent());
-    Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps());
     Assert.assertEquals(Collections.EMPTY_SET, config.getExclusiveStartSequenceNumberPartitions());
   }
 
@@ -93,8 +92,7 @@ public class KafkaIOConfigTest
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
-                     + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n"
-                     + "  \"skipOffsetGaps\": true\n"
+                     + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
 
     KafkaIndexTaskIOConfig config = (KafkaIndexTaskIOConfig) mapper.readValue(
@@ -115,9 +113,7 @@ public class KafkaIOConfigTest
     Assert.assertFalse(config.isUseTransaction());
     Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getMinimumMessageTime().get());
     Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get());
-    Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps());
     Assert.assertEquals(Collections.EMPTY_SET, config.getExclusiveStartSequenceNumberPartitions());
-
   }
 
   @Test
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index b7b3896..3d6308c 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -99,14 +99,16 @@ import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
 import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
 import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
 import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.QueryWatcher;
 import org.apache.druid.query.Result;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -114,6 +116,13 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryConfig;
+import org.apache.druid.query.scan.ScanQueryEngine;
+import org.apache.druid.query.scan.ScanQueryQueryToolChest;
+import org.apache.druid.query.scan.ScanQueryRunnerFactory;
+import org.apache.druid.query.scan.ScanResultValue;
+import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
 import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@@ -172,11 +181,14 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.druid.query.QueryPlus.wrap;
+
 @RunWith(Parameterized.class)
 public class KafkaIndexTaskTest
 {
@@ -372,11 +384,7 @@ public class KafkaIndexTaskTest
   public void testRunAfterDataInserted() throws Exception
   {
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     final KafkaIndexTask task = createTask(
         null,
@@ -389,8 +397,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -432,8 +439,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -445,11 +451,7 @@ public class KafkaIndexTaskTest
     }
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
@@ -484,11 +486,7 @@ public class KafkaIndexTaskTest
     maxRowsPerSegment = 2;
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
     Map<String, Object> consumerProps = kafkaServer.consumerProperties();
     consumerProps.put("max.poll.records", "1");
 
@@ -541,8 +539,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
     final ListenableFuture<TaskStatus> future = runTask(task);
@@ -614,141 +611,148 @@ public class KafkaIndexTaskTest
     int numToAdd = records.size() - 2;
 
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
       for (int i = 0; i < numToAdd; i++) {
         kafkaProducer.send(records.get(i)).get();
       }
+      kafkaProducer.commitTransaction();
+    }
 
-      Map<String, Object> consumerProps = kafkaServer.consumerProperties();
-      consumerProps.put("max.poll.records", "1");
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
 
-      final SeekableStreamPartitions<Integer, Long> startPartitions = new SeekableStreamPartitions<>(
-          topic,
-          ImmutableMap.of(
-              0,
-              0L,
-              1,
-              0L
-          )
-      );
-      final SeekableStreamPartitions<Integer, Long> checkpoint1 = new SeekableStreamPartitions<>(
-          topic,
-          ImmutableMap.of(
-              0,
-              3L,
-              1,
-              0L
-          )
-      );
-      final SeekableStreamPartitions<Integer, Long> checkpoint2 = new SeekableStreamPartitions<>(
-          topic,
-          ImmutableMap.of(
-              0,
-              10L,
-              1,
-              0L
-          )
-      );
+    final SeekableStreamPartitions<Integer, Long> startPartitions = new SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(
+            0,
+            0L,
+            1,
+            0L
+        )
+    );
+    final SeekableStreamPartitions<Integer, Long> checkpoint1 = new SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(
+            0,
+            3L,
+            1,
+            0L
+        )
+    );
+    final SeekableStreamPartitions<Integer, Long> checkpoint2 = new SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(
+            0,
+            10L,
+            1,
+            0L
+        )
+    );
 
-      final SeekableStreamPartitions<Integer, Long> endPartitions = new SeekableStreamPartitions<>(
-          topic,
-          ImmutableMap.of(
-              0,
-              10L,
-              1,
-              2L
-          )
-      );
-      final KafkaIndexTask task = createTask(
-          null,
-          new KafkaIndexTaskIOConfig(
-              0,
-              baseSequenceName,
-              startPartitions,
-              endPartitions,
-              consumerProps,
-              KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
-              true,
-              null,
-              null,
-              false
-          )
-      );
-      final ListenableFuture<TaskStatus> future = runTask(task);
-      while (task.getRunner().getStatus() != Status.PAUSED) {
-        Thread.sleep(10);
-      }
-      final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    final SeekableStreamPartitions<Integer, Long> endPartitions = new SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(
+            0,
+            10L,
+            1,
+            2L
+        )
+    );
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null
+        )
+    );
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    while (task.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
 
-      Assert.assertTrue(checkpoint1.getPartitionSequenceNumberMap().equals(currentOffsets));
-      task.getRunner().setEndOffsets(currentOffsets, false);
+    Assert.assertTrue(checkpoint1.getPartitionSequenceNumberMap().equals(currentOffsets));
+    task.getRunner().setEndOffsets(currentOffsets, false);
 
-      while (task.getRunner().getStatus() != Status.PAUSED) {
-        Thread.sleep(10);
-      }
+    while (task.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
 
-      // add remaining records
+    // add remaining records
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
       for (int i = numToAdd; i < records.size(); i++) {
         kafkaProducer.send(records.get(i)).get();
       }
-      final Map<Integer, Long> nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
-
-      Assert.assertTrue(checkpoint2.getPartitionSequenceNumberMap().equals(nextOffsets));
-      task.getRunner().setEndOffsets(nextOffsets, false);
-
-      Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
-
-      Assert.assertEquals(2, checkpointRequestsHash.size());
-      Assert.assertTrue(
-          checkpointRequestsHash.contains(
-              Objects.hash(
-                  DATA_SCHEMA.getDataSource(),
-                  0,
-                  new KafkaDataSourceMetadata(startPartitions),
-                  new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, currentOffsets))
-              )
-          )
-      );
-      Assert.assertTrue(
-          checkpointRequestsHash.contains(
-              Objects.hash(
-                  DATA_SCHEMA.getDataSource(),
-                  0,
-                  new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, currentOffsets)),
-                  new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, nextOffsets))
-              )
-          )
-      );
+      kafkaProducer.commitTransaction();
+    }
+    final Map<Integer, Long> nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
 
-      // Check metrics
-      Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed());
-      Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable());
-      Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
-
-      // Check published metadata
-      SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
-      SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
-      SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
-      SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
-      SegmentDescriptor desc5 = SD(task, "2011/P1D", 1);
-      SegmentDescriptor desc6 = SD(task, "2012/P1D", 0);
-      SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
-      Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
-      Assert.assertEquals(
-          new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
-          metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
-      );
+    Assert.assertTrue(checkpoint2.getPartitionSequenceNumberMap().equals(nextOffsets));
+    task.getRunner().setEndOffsets(nextOffsets, false);
 
-      // Check segments in deep storage
-      Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1));
-      Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2));
-      Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc3));
-      Assert.assertTrue((ImmutableList.of("d", "e").equals(readSegmentColumn("dim1", desc4))
-                         && ImmutableList.of("h").equals(readSegmentColumn("dim1", desc5))) ||
-                        (ImmutableList.of("d", "h").equals(readSegmentColumn("dim1", desc4))
-                         && ImmutableList.of("e").equals(readSegmentColumn("dim1", desc5))));
-      Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc6));
-      Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7));
-    }
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+
+    Assert.assertEquals(2, checkpointRequestsHash.size());
+    Assert.assertTrue(
+        checkpointRequestsHash.contains(
+            Objects.hash(
+                DATA_SCHEMA.getDataSource(),
+                0,
+                new KafkaDataSourceMetadata(startPartitions),
+                new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, currentOffsets))
+            )
+        )
+    );
+    Assert.assertTrue(
+        checkpointRequestsHash.contains(
+            Objects.hash(
+                DATA_SCHEMA.getDataSource(),
+                0,
+                new KafkaDataSourceMetadata(new SeekableStreamPartitions(topic, currentOffsets)),
+                new KafkaDataSourceMetadata(new SeekableStreamPartitions(topic, nextOffsets))
+            )
+        )
+    );
+
+    // Check metrics
+    Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
+
+    // Check published metadata
+    SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
+    SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
+    SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
+    SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
+    SegmentDescriptor desc5 = SD(task, "2011/P1D", 1);
+    SegmentDescriptor desc6 = SD(task, "2012/P1D", 0);
+    SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
+    Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
+    Assert.assertEquals(
+        new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
+        metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
+    );
+
+    // Check segments in deep storage
+    Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1));
+    Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2));
+    Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc3));
+    Assert.assertTrue((ImmutableList.of("d", "e").equals(readSegmentColumn("dim1", desc4))
+                       && ImmutableList.of("h").equals(readSegmentColumn("dim1", desc5))) ||
+                      (ImmutableList.of("d", "h").equals(readSegmentColumn("dim1", desc4))
+                       && ImmutableList.of("e").equals(readSegmentColumn("dim1", desc5))));
+    Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc6));
+    Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7));
   }
 
   @Test(timeout = 60_000L)
@@ -763,11 +767,7 @@ public class KafkaIndexTaskTest
     intermediateHandoffPeriod = new Period().withSeconds(0);
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records.subList(0, 2)) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
     Map<String, Object> consumerProps = kafkaServer.consumerProperties();
     consumerProps.put("max.poll.records", "1");
 
@@ -810,8 +810,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
     final ListenableFuture<TaskStatus> future = runTask(task);
@@ -882,9 +881,12 @@ public class KafkaIndexTaskTest
 
     // Insert data
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
       for (ProducerRecord<byte[], byte[]> record : records) {
         kafkaProducer.send(record).get();
       }
+      kafkaProducer.commitTransaction();
     }
     Map<String, Object> consumerProps = kafkaServer.consumerProperties();
     consumerProps.put("max.poll.records", "1");
@@ -913,8 +915,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
     final ListenableFuture<TaskStatus> future = runTask(task);
@@ -950,8 +951,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             DateTimes.of("2010"),
-            null,
-            false
+            null
         )
     );
 
@@ -963,11 +963,7 @@ public class KafkaIndexTaskTest
     }
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
@@ -1005,8 +1001,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            DateTimes.of("2010"),
-            false
+            DateTimes.of("2010")
         )
     );
 
@@ -1018,11 +1013,7 @@ public class KafkaIndexTaskTest
     }
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
@@ -1070,8 +1061,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1083,11 +1073,7 @@ public class KafkaIndexTaskTest
     }
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
@@ -1114,11 +1100,7 @@ public class KafkaIndexTaskTest
   public void testRunOnNothing() throws Exception
   {
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     final KafkaIndexTask task = createTask(
         null,
@@ -1131,8 +1113,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1156,11 +1137,7 @@ public class KafkaIndexTaskTest
     handoffConditionTimeout = 5_000;
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     final KafkaIndexTask task = createTask(
         null,
@@ -1173,8 +1150,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1209,11 +1185,7 @@ public class KafkaIndexTaskTest
     handoffConditionTimeout = 100;
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     final KafkaIndexTask task = createTask(
         null,
@@ -1226,8 +1198,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1265,11 +1236,7 @@ public class KafkaIndexTaskTest
     maxSavedParseExceptions = 2;
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     final KafkaIndexTask task = createTask(
         null,
@@ -1282,8 +1249,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1310,11 +1276,7 @@ public class KafkaIndexTaskTest
     maxSavedParseExceptions = 6;
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     final KafkaIndexTask task = createTask(
         null,
@@ -1327,8 +1289,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1393,11 +1354,7 @@ public class KafkaIndexTaskTest
     maxSavedParseExceptions = 2;
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     final KafkaIndexTask task = createTask(
         null,
@@ -1410,8 +1367,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1471,8 +1427,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
     final KafkaIndexTask task2 = createTask(
@@ -1486,8 +1441,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1495,11 +1449,7 @@ public class KafkaIndexTaskTest
     final ListenableFuture<TaskStatus> future2 = runTask(task2);
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     // Wait for tasks to exit
     Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
@@ -1541,8 +1491,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
     final KafkaIndexTask task2 = createTask(
@@ -1556,17 +1505,12 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     // Run first task
     final ListenableFuture<TaskStatus> future1 = runTask(task1);
@@ -1612,8 +1556,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             false,
             null,
-            null,
-            false
+            null
         )
     );
     final KafkaIndexTask task2 = createTask(
@@ -1627,17 +1570,12 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             false,
             null,
-            null,
-            false
+            null
         )
     );
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     // Run first task
     final ListenableFuture<TaskStatus> future1 = runTask(task1);
@@ -1688,20 +1626,14 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
     final ListenableFuture<TaskStatus> future = runTask(task);
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-      kafkaProducer.flush();
-    }
+    insertData();
 
     // Wait for tasks to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
@@ -1754,8 +1686,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
     final KafkaIndexTask task2 = createTask(
@@ -1769,8 +1700,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1778,11 +1708,7 @@ public class KafkaIndexTaskTest
     final ListenableFuture<TaskStatus> future2 = runTask(task2);
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     // Wait for tasks to exit
     Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
@@ -1821,13 +1747,12 @@ public class KafkaIndexTaskTest
             0,
             "sequence0",
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
-            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)),
             kafkaServer.consumerProperties(),
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1835,9 +1760,12 @@ public class KafkaIndexTaskTest
 
     // Insert some data, but not enough for the task to finish
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
       for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 4)) {
         kafkaProducer.send(record).get();
       }
+      kafkaProducer.commitTransaction();
     }
 
     while (countEvents(task1) != 2) {
@@ -1859,13 +1787,12 @@ public class KafkaIndexTaskTest
             0,
             "sequence0",
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
-            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)),
             kafkaServer.consumerProperties(),
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1873,12 +1800,16 @@ public class KafkaIndexTaskTest
 
     // Insert remaining data
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
       for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 4)) {
         kafkaProducer.send(record).get();
       }
+      kafkaProducer.commitTransaction();
     }
 
     // Wait for task to exit
+
     Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
 
     // Check metrics
@@ -1894,7 +1825,7 @@ public class KafkaIndexTaskTest
     SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
     Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
     Assert.assertEquals(
-        new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
+        new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L))),
         metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
     );
 
@@ -1912,13 +1843,12 @@ public class KafkaIndexTaskTest
             0,
             "sequence0",
             new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)),
-            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)),
             kafkaServer.consumerProperties(),
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -1926,10 +1856,13 @@ public class KafkaIndexTaskTest
 
     // Insert some data, but not enough for the task to finish
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
       for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 4)) {
         kafkaProducer.send(record).get();
       }
       kafkaProducer.flush();
+      kafkaProducer.commitTransaction();
     }
 
     while (countEvents(task) != 2) {
@@ -1946,12 +1879,14 @@ public class KafkaIndexTaskTest
         }
     );
     Assert.assertEquals(Status.PAUSED, task.getRunner().getStatus());
-
     // Insert remaining data
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
       for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 4)) {
         kafkaProducer.send(record).get();
       }
+      kafkaProducer.commitTransaction();
     }
 
     try {
@@ -1979,7 +1914,7 @@ public class KafkaIndexTaskTest
     SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
     Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
     Assert.assertEquals(
-        new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))),
+        new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L))),
         metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
     );
 
@@ -2002,8 +1937,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -2025,11 +1959,7 @@ public class KafkaIndexTaskTest
   {
     resetOffsetAutomatically = true;
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     final KafkaIndexTask task = createTask(
         null,
@@ -2042,8 +1972,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -2070,11 +1999,7 @@ public class KafkaIndexTaskTest
       return;
     }
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     final TreeMap<Integer, Map<Integer, Long>> sequences = new TreeMap<>();
     // Here the sequence number is 1 meaning that one incremental handoff was done by the failed task
@@ -2097,8 +2022,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         ),
         context
     );
@@ -2131,11 +2055,7 @@ public class KafkaIndexTaskTest
   public void testRunWithDuplicateRequest() throws Exception
   {
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     final KafkaIndexTask task = createTask(
         null,
@@ -2148,8 +2068,7 @@ public class KafkaIndexTaskTest
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             null,
-            null,
-            false
+            null
         )
     );
 
@@ -2170,6 +2089,134 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(Status.READING, task.getRunner().getStatus());
   }
 
+  @Test(timeout = 60_000L)
+  public void testRunTransactionModeRollback() throws Exception
+  {
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            "sequence0",
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L)),
+            kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null
+        )
+    );
+
+    final ListenableFuture<TaskStatus> future = runTask(task);
+
+    // Insert 2 records initially
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 2)) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.commitTransaction();
+    }
+
+    while (countEvents(task) != 2) {
+      Thread.sleep(25);
+    }
+
+    Assert.assertEquals(2, countEvents(task));
+    Assert.assertEquals(Status.READING, task.getRunner().getStatus());
+
+    //verify the 2 indexed records
+    final QuerySegmentSpec firstInterval = objectMapper.readValue(
+        "\"2008/2010\"", QuerySegmentSpec.class
+    );
+    Iterable<ScanResultValue> scanResultValues = scanData(task, firstInterval);
+    Assert.assertEquals(2, Iterables.size(scanResultValues));
+
+    // Insert 3 more records and rollback
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : Iterables.limit(Iterables.skip(records, 2), 3)) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.flush();
+      kafkaProducer.abortTransaction();
+    }
+
+    Assert.assertEquals(2, countEvents(task));
+    Assert.assertEquals(Status.READING, task.getRunner().getStatus());
+
+    final QuerySegmentSpec rollbackedInterval = objectMapper.readValue(
+        "\"2010/2012\"", QuerySegmentSpec.class
+    );
+    scanResultValues = scanData(task, rollbackedInterval);
+    //verify that there are no records indexed in the rollbacked time period
+    Assert.assertEquals(0, Iterables.size(scanResultValues));
+
+    // Insert remaining data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 5)) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.commitTransaction();
+    }
+
+    final QuerySegmentSpec endInterval = objectMapper.readValue(
+        "\"2008/2049\"", QuerySegmentSpec.class
+    );
+    Iterable<ScanResultValue> scanResultValues1 = scanData(task, endInterval);
+    Assert.assertEquals(2, Iterables.size(scanResultValues1));
+
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+    Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets());
+
+    // Check metrics
+    Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
+
+    // Check published metadata
+    SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
+    SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
+    SegmentDescriptor desc3 = SD(task, "2013/P1D", 0);
+    SegmentDescriptor desc4 = SD(task, "2049/P1D", 0);
+    Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
+    Assert.assertEquals(
+        new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L))),
+        metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
+    );
+
+    // Check segments in deep storage
+    Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1));
+    Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2));
+    Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc3));
+    Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc4));
+  }
+
+  private List<ScanResultValue> scanData(final Task task, QuerySegmentSpec spec)
+  {
+    ScanQuery query = new Druids.ScanQueryBuilder().dataSource(
+        DATA_SCHEMA.getDataSource()).intervals(spec).build();
+    List<ScanResultValue> results =
+        task.getQueryRunner(query).run(wrap(query), new HashMap<>()).toList();
+    return results;
+  }
+
+  private void insertData() throws ExecutionException, InterruptedException
+  {
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : records) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.commitTransaction();
+    }
+  }
+
   private ListenableFuture<TaskStatus> runTask(final Task task)
   {
     try {
@@ -2338,7 +2385,7 @@ public class KafkaIndexTaskTest
     );
   }
 
-  private QueryRunnerFactoryConglomerate makeTimeseriesOnlyConglomerate()
+  private QueryRunnerFactoryConglomerate makeTimeseriesAndScanConglomerate()
   {
     IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator(
         null,
@@ -2353,16 +2400,33 @@ public class KafkaIndexTaskTest
       }
     };
     return new DefaultQueryRunnerFactoryConglomerate(
-        ImmutableMap.of(
-            TimeseriesQuery.class,
-            new TimeseriesQueryRunnerFactory(
-                new TimeseriesQueryQueryToolChest(queryRunnerDecorator),
-                new TimeseriesQueryEngine(),
-                (query, future) -> {
-                  // do nothing
-                }
+        ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
+            .put(
+                TimeseriesQuery.class,
+                new TimeseriesQueryRunnerFactory(
+                    new TimeseriesQueryQueryToolChest(queryRunnerDecorator),
+                    new TimeseriesQueryEngine(),
+                    new QueryWatcher()
+                    {
+                      @Override
+                      public void registerQuery(Query query, ListenableFuture future)
+                      {
+                        // do nothing
+                      }
+                    }
+                )
             )
-        )
+            .put(
+                ScanQuery.class,
+                new ScanQueryRunnerFactory(
+                    new ScanQueryQueryToolChest(
+                        new ScanQueryConfig(),
+                        new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
+                    ),
+                    new ScanQueryEngine()
+                )
+            )
+            .build()
     );
   }
 
@@ -2492,7 +2556,7 @@ public class KafkaIndexTaskTest
         new TestDataSegmentAnnouncer(),
         EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
         handoffNotifierFactory,
-        this::makeTimeseriesOnlyConglomerate,
+        this::makeTimeseriesAndScanConglomerate,
         Execs.directExecutor(), // queryExecutorService
         EasyMock.createMock(MonitorScheduler.class),
         new SegmentLoaderFactory(
@@ -2603,7 +2667,7 @@ public class KafkaIndexTaskTest
                                   .build();
 
     List<Result<TimeseriesResultValue>> results =
-        task.getQueryRunner(query).run(QueryPlus.wrap(query), ImmutableMap.of()).toList();
+        task.getQueryRunner(query).run(wrap(query), ImmutableMap.of()).toList();
 
     return results.isEmpty() ? 0L : DimensionHandlerUtils.nullToZero(results.get(0).getValue().getLongMetric("rows"));
   }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index 2f445aa..f944bf0 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -166,11 +166,7 @@ public class KafkaRecordSupplierTest
   {
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
         StreamPartition.of(topic, 0),
@@ -195,11 +191,7 @@ public class KafkaRecordSupplierTest
   {
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
         StreamPartition.of(topic, 0),
@@ -232,10 +224,13 @@ public class KafkaRecordSupplierTest
   public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionException
   {
     // Insert data
-
-    KafkaProducer<byte[], byte[]> producer = kafkaServer.newProducer();
-    for (ProducerRecord<byte[], byte[]> record : records.subList(0, 13)) {
-      producer.send(record).get();
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : records.subList(0, 13)) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.commitTransaction();
     }
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
@@ -257,8 +252,13 @@ public class KafkaRecordSupplierTest
     }
 
     // Insert data
-    for (ProducerRecord<byte[], byte[]> rec : records.subList(13, 15)) {
-      producer.send(rec).get();
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : records.subList(13, 15)) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.commitTransaction();
     }
 
 
@@ -270,8 +270,28 @@ public class KafkaRecordSupplierTest
     List<OrderedPartitionableRecord<Integer, Long>> initialRecords = createOrderedPartitionableRecords();
 
     Assert.assertEquals(records.size(), polledRecords.size());
-    Assert.assertTrue(initialRecords.containsAll(polledRecords));
+    Assert.assertEquals(partitions, recordSupplier.getAssignment());
 
+    final int initialRecordsPartition0Size = initialRecords.stream()
+                                                           .filter(r -> r.getPartitionId().equals(0))
+                                                           .collect(Collectors.toSet())
+                                                           .size();
+    final int initialRecordsPartition1Size = initialRecords.stream()
+                                                           .filter(r -> r.getPartitionId().equals(1))
+                                                           .collect(Collectors.toSet())
+                                                           .size();
+
+    final int polledRecordsPartition0Size = polledRecords.stream()
+                                                         .filter(r -> r.getPartitionId().equals(0))
+                                                         .collect(Collectors.toSet())
+                                                         .size();
+    final int polledRecordsPartition1Size = polledRecords.stream()
+                                                         .filter(r -> r.getPartitionId().equals(1))
+                                                         .collect(Collectors.toSet())
+                                                         .size();
+
+    Assert.assertEquals(initialRecordsPartition0Size, polledRecordsPartition0Size);
+    Assert.assertEquals(initialRecordsPartition1Size, polledRecordsPartition1Size);
 
     recordSupplier.close();
   }
@@ -280,11 +300,7 @@ public class KafkaRecordSupplierTest
   public void testSeek() throws InterruptedException, ExecutionException
   {
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
     StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
@@ -326,11 +342,7 @@ public class KafkaRecordSupplierTest
   public void testSeekToLatest() throws InterruptedException, ExecutionException
   {
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
     StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
@@ -388,11 +400,7 @@ public class KafkaRecordSupplierTest
   public void testPosition() throws ExecutionException, InterruptedException
   {
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
+    insertData();
 
     StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
     StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
@@ -420,7 +428,7 @@ public class KafkaRecordSupplierTest
     Assert.assertEquals(0L, (long) recordSupplier.getPosition(partition0));
 
     recordSupplier.seekToLatest(Collections.singleton(partition0));
-    Assert.assertEquals(11L, (long) recordSupplier.getPosition(partition0));
+    Assert.assertEquals(12L, (long) recordSupplier.getPosition(partition0));
 
     long prevPos = recordSupplier.getPosition(partition0);
     recordSupplier.getEarliestSequenceNumber(partition0);
@@ -433,5 +441,16 @@ public class KafkaRecordSupplierTest
     recordSupplier.close();
   }
 
+  private void insertData() throws ExecutionException, InterruptedException
+  {
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : records) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.commitTransaction();
+    }
+  }
 
 }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 3337faa..ee3dfe8 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -78,7 +78,6 @@ public class KafkaSupervisorIOConfigTest
     Assert.assertEquals(Duration.standardMinutes(30), config.getCompletionTimeout());
     Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent());
     Assert.assertFalse("earlyMessageRejectionPeriod", config.getEarlyMessageRejectionPeriod().isPresent());
-    Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps());
   }
 
   @Test
@@ -97,8 +96,7 @@ public class KafkaSupervisorIOConfigTest
                      + "  \"useEarliestOffset\": true,\n"
                      + "  \"completionTimeout\": \"PT45M\",\n"
                      + "  \"lateMessageRejectionPeriod\": \"PT1H\",\n"
-                     + "  \"earlyMessageRejectionPeriod\": \"PT1H\",\n"
-                     + "  \"skipOffsetGaps\": true\n"
+                     + "  \"earlyMessageRejectionPeriod\": \"PT1H\"\n"
                      + "}";
 
     KafkaSupervisorIOConfig config = mapper.readValue(
@@ -122,7 +120,6 @@ public class KafkaSupervisorIOConfigTest
     Assert.assertEquals(Duration.standardMinutes(45), config.getCompletionTimeout());
     Assert.assertEquals(Duration.standardHours(1), config.getLateMessageRejectionPeriod().get());
     Assert.assertEquals(Duration.standardHours(1), config.getEarlyMessageRejectionPeriod().get());
-    Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps());
   }
 
   @Test
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index a08806a..f2db280 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -258,7 +258,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testNoInitialState() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -289,7 +289,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
     Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
     Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent());
-    Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
 
     Assert.assertEquals(topic, taskConfig.getStartPartitions().getStream());
     Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionSequenceNumberMap().get(0));
@@ -305,7 +304,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testSkipOffsetGaps() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -328,13 +327,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
     KafkaIndexTask task = captured.getValue();
     KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
 
-    Assert.assertTrue("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
   }
 
   @Test
   public void testMultiTask() throws Exception
   {
-    supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 2, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -380,7 +378,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testReplicas() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -417,7 +415,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testLateMessageRejectionPeriod() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null, false);
+    supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null);
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -456,7 +454,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testEarlyMessageRejectionPeriod() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"), false);
+    supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"));
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -498,7 +496,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
    */
   public void testLatestOffset() throws Exception
   {
-    supervisor = getSupervisor(1, 1, false, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, false, "PT1H", null, null);
     addSomeEvents(1100);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -518,9 +516,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
     verifyAll();
 
     KafkaIndexTask task = captured.getValue();
-    Assert.assertEquals(1100L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(0));
-    Assert.assertEquals(1100L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(1));
-    Assert.assertEquals(1100L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(2));
+    Assert.assertEquals(1101L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(0));
+    Assert.assertEquals(1101L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(1));
+    Assert.assertEquals(1101L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(2));
   }
 
   @Test
@@ -530,7 +528,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
    */
   public void testDatasourceMetadata() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     addSomeEvents(100);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -560,7 +558,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test(expected = ISE.class)
   public void testBadMetadataOffsets() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     addSomeEvents(1);
 
     expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
@@ -579,7 +577,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testKillIncompatibleTasks() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
     addSomeEvents(1);
 
     // unexpected # of partitions (kill)
@@ -684,7 +682,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testKillBadPartitionAssignment() throws Exception
   {
-    supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 2, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Task id1 = createKafkaIndexTask(
@@ -791,7 +789,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testRequeueTaskWhenFailed() throws Exception
   {
-    supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(2, 2, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -880,7 +878,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testRequeueAdoptedTaskWhenFailed() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
     addSomeEvents(1);
 
     DateTime now = DateTimes.nowUtc();
@@ -981,7 +979,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testQueueNextTasksOnSuccess() throws Exception
   {
-    supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(2, 2, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1083,7 +1081,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false);
+    supervisor = getSupervisor(2, 2, true, "PT1M", null, null);
     addSomeEvents(100);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1178,7 +1176,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Task task = createKafkaIndexTask(
@@ -1284,7 +1282,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Task task = createKafkaIndexTask(
@@ -1383,7 +1381,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     addSomeEvents(6);
 
     Task id1 = createKafkaIndexTask(
@@ -1474,23 +1472,23 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(startTime, activeReport.getStartTime());
     Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L), activeReport.getStartingOffsets());
     Assert.assertEquals(ImmutableMap.of(0, 4L, 1, 5L, 2, 6L), activeReport.getCurrentOffsets());
-    Assert.assertEquals(ImmutableMap.of(0, 2L, 1, 1L, 2, 0L), activeReport.getLag());
+    Assert.assertEquals(ImmutableMap.of(0, 3L, 1, 2L, 2, 1L), activeReport.getLag());
 
     Assert.assertEquals("id1", publishingReport.getId());
     Assert.assertEquals(ImmutableMap.of(0, 0L, 1, 0L, 2, 0L), publishingReport.getStartingOffsets());
     Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L), publishingReport.getCurrentOffsets());
     Assert.assertEquals(null, publishingReport.getLag());
 
-    Assert.assertEquals(ImmutableMap.of(0, 6L, 1, 6L, 2, 6L), payload.getLatestOffsets());
-    Assert.assertEquals(ImmutableMap.of(0, 2L, 1, 1L, 2, 0L), payload.getMinimumLag());
-    Assert.assertEquals(3L, (long) payload.getAggregateLag());
+    Assert.assertEquals(ImmutableMap.of(0, 7L, 1, 7L, 2, 7L), payload.getLatestOffsets());
+    Assert.assertEquals(ImmutableMap.of(0, 3L, 1, 2L, 2, 1L), payload.getMinimumLag());
+    Assert.assertEquals(6L, (long) payload.getAggregateLag());
     Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow());
   }
 
   @Test
   public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
   {
-    supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(2, 2, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1549,7 +1547,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false);
+    supervisor = getSupervisor(2, 2, true, "PT1M", null, null);
     addSomeEvents(100);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1635,7 +1633,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false);
+    supervisor = getSupervisor(2, 2, true, "PT1M", null, null);
     addSomeEvents(100);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1726,7 +1724,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test(expected = IllegalStateException.class)
   public void testStopNotStarted()
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     supervisor.stop(false);
   }
 
@@ -1738,7 +1736,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     taskRunner.unregisterListener(StringUtils.format("KafkaSupervisor-%s", DATASOURCE));
     replayAll();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     supervisor.start();
     supervisor.stop(false);
 
@@ -1752,7 +1750,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Task id1 = createKafkaIndexTask(
@@ -1864,7 +1862,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
     replayAll();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     supervisor.start();
     supervisor.runInternal();
     verifyAll();
@@ -1881,7 +1879,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testResetDataSourceMetadata() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@@ -1937,7 +1935,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testResetNoDataSourceMetadata() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
     expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@@ -1970,7 +1968,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
     addSomeEvents(1);
 
     Task id1 = createKafkaIndexTask(
@@ -2068,7 +2066,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   public void testNoDataIngestionTasks() throws Exception
   {
     final DateTime startTime = DateTimes.nowUtc();
-    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    supervisor = getSupervisor(2, 1, true, "PT1S", null, null);
     //not adding any events
     Task id1 = createKafkaIndexTask(
         "id1",
@@ -2164,7 +2162,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   public void testCheckpointForInactiveTaskGroup()
       throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
   {
-    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    supervisor = getSupervisor(2, 1, true, "PT1S", null, null);
     //not adding any events
     final Task id1 = createKafkaIndexTask(
         "id1",
@@ -2267,7 +2265,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   public void testCheckpointForUnknownTaskGroup()
       throws InterruptedException
   {
-    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    supervisor = getSupervisor(2, 1, true, "PT1S", null, null);
     //not adding any events
     final Task id1 = createKafkaIndexTask(
         "id1",
@@ -2346,7 +2344,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   public void testCheckpointWithNullTaskGroupId()
       throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
   {
-    supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
+    supervisor = getSupervisor(1, 3, true, "PT1S", null, null);
     //not adding any events
     final Task id1 = createKafkaIndexTask(
         "id1",
@@ -2438,7 +2436,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testSuspendedNoRunningTasks() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true, kafkaHost);
     addSomeEvents(1);
 
     expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -2471,7 +2469,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false, true, kafkaHost);
+    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, true, kafkaHost);
     addSomeEvents(1);
 
     Task id1 = createKafkaIndexTask(
@@ -2579,7 +2577,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
     replayAll();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true, kafkaHost);
     supervisor.start();
     supervisor.runInternal();
     verifyAll();
@@ -2604,7 +2602,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         false,
-        false,
         StringUtils.format("badhostname:%d", kafkaServer.getPort())
     );
     addSomeEvents(1);
@@ -2670,7 +2667,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
     Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
     Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent());
-    Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
 
     Assert.assertEquals(topic, taskConfig.getStartPartitions().getStream());
     Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionSequenceNumberMap().get(0));
@@ -2686,7 +2682,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testGetCurrentTotalStats()
   {
-    supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+    supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false, kafkaHost);
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition(0),
         ImmutableMap.of(0, 0L),
@@ -2734,6 +2730,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     AdminUtils.createTopic(zkUtils, topic, NUM_PARTITIONS, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
 
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
       for (int i = 0; i < NUM_PARTITIONS; i++) {
         for (int j = 0; j < numEventsPerPartition; j++) {
           kafkaProducer.send(
@@ -2746,6 +2744,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
           ).get();
         }
       }
+      kafkaProducer.commitTransaction();
     }
   }
 
@@ -2755,8 +2754,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
       boolean useEarliestOffset,
       String duration,
       Period lateMessageRejectionPeriod,
-      Period earlyMessageRejectionPeriod,
-      boolean skipOffsetGaps
+      Period earlyMessageRejectionPeriod
   )
   {
     return getSupervisor(
@@ -2766,7 +2764,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
         duration,
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
-        skipOffsetGaps,
         false,
         kafkaHost
     );
@@ -2779,7 +2776,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
       String duration,
       Period lateMessageRejectionPeriod,
       Period earlyMessageRejectionPeriod,
-      boolean skipOffsetGaps,
       boolean suspended,
       String kafkaHost
   )
@@ -2787,6 +2783,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Map<String, Object> consumerProperties = new HashMap<>();
     consumerProperties.put("myCustomKey", "myCustomValue");
     consumerProperties.put("bootstrap.servers", kafkaHost);
+    consumerProperties.put("isolation.level", "read_committed");
     KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
         topic,
         replicas,
@@ -2799,8 +2796,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         useEarliestOffset,
         new Period("PT30M"),
         lateMessageRejectionPeriod,
-        earlyMessageRejectionPeriod,
-        skipOffsetGaps
+        earlyMessageRejectionPeriod
     );
 
     KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
@@ -2908,8 +2904,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
             minimumMessageTime,
-            maximumMessageTime,
-            false
+            maximumMessageTime
         ),
         Collections.emptyMap(),
         null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
index 6659f92..10c9b2e 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
@@ -40,11 +40,12 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 
 public class TestBroker implements Closeable
 {
-
+  private static final Random RANDOM = ThreadLocalRandom.current();
   private final String zookeeperConnect;
   private final File directory;
   private final boolean directoryCleanup;
@@ -77,6 +78,9 @@ public class TestBroker implements Closeable
     props.setProperty("broker.id", String.valueOf(id));
     props.setProperty("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
     props.setProperty("advertised.host.name", "localhost");
+    props.setProperty("transaction.state.log.replication.factor", "1");
+    props.setProperty("offsets.topic.replication.factor", "1");
+    props.setProperty("transaction.state.log.min.isr", "1");
     props.putAll(brokerProps);
 
     final KafkaConfig config = new KafkaConfig(props);
@@ -112,6 +116,8 @@ public class TestBroker implements Closeable
     props.put("key.serializer", ByteArraySerializer.class.getName());
     props.put("value.serializer", ByteArraySerializer.class.getName());
     props.put("acks", "all");
+    props.put("enable.idempotence", "true");
+    props.put("transactional.id", String.valueOf(RANDOM.nextInt()));
     return props;
   }
 
@@ -121,8 +127,9 @@ public class TestBroker implements Closeable
     props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort()));
     props.put("key.deserializer", ByteArrayDeserializer.class.getName());
     props.put("value.deserializer", ByteArrayDeserializer.class.getName());
-    props.put("group.id", String.valueOf(ThreadLocalRandom.current().nextInt()));
+    props.put("group.id", String.valueOf(RANDOM.nextInt()));
     props.put("auto.offset.reset", "earliest");
+    props.put("isolation.level", "read_committed");
     return props;
   }
 
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
index 307e971..8dd32f2 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
@@ -68,7 +68,6 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
         useTransaction,
         minimumMessageTime,
         maximumMessageTime,
-        true,
         exclusiveStartSequenceNumberPartitions
     );
     Preconditions.checkArgument(endPartitions.getPartitionSequenceNumberMap()
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
index fff34f9..ab78229 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
@@ -89,7 +89,6 @@ public class KinesisIOConfigTest
     Assert.assertNull(config.getAwsAssumedRoleArn());
     Assert.assertNull(config.getAwsExternalId());
     Assert.assertFalse(config.isDeaggregate());
-    Assert.assertTrue(config.isSkipOffsetGaps());
   }
 
   @Test
@@ -146,7 +145,6 @@ public class KinesisIOConfigTest
     Assert.assertEquals("role", config.getAwsAssumedRoleArn());
     Assert.assertEquals("awsexternalid", config.getAwsExternalId());
     Assert.assertTrue(config.isDeaggregate());
-    Assert.assertTrue(config.isSkipOffsetGaps());
   }
 
   @Test
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 2918696..bfabab1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -64,7 +64,7 @@ import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 
 
-public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> extends AbstractTask
+public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType extends Comparable> extends AbstractTask
     implements ChatHandler
 {
   public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
index 6c469c7..dde9702 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
@@ -43,7 +43,6 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
   private final boolean useTransaction;
   private final Optional<DateTime> minimumMessageTime;
   private final Optional<DateTime> maximumMessageTime;
-  private final boolean skipOffsetGaps;
   private final Set<PartitionIdType> exclusiveStartSequenceNumberPartitions;
 
   @JsonCreator
@@ -55,7 +54,6 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
       @JsonProperty("useTransaction") Boolean useTransaction,
       @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
       @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
-      @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps,
       @JsonProperty("exclusiveStartSequenceNumberPartitions")
           Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
   )
@@ -67,7 +65,6 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
     this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
     this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
     this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
-    this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS;
     this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions == null
                                                   ? Collections.emptySet()
                                                   : exclusiveStartSequenceNumberPartitions;
@@ -137,9 +134,4 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
     return minimumMessageTime;
   }
 
-  @JsonProperty
-  public boolean isSkipOffsetGaps()
-  {
-    return skipOffsetGaps;
-  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index c86a2b5..b2b906e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -130,7 +130,7 @@ import java.util.stream.Collectors;
  * @param <PartitionIdType>    Partition Number Type
  * @param <SequenceOffsetType> Sequence Number Type
  */
-public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> implements ChatHandler
+public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType extends Comparable> implements ChatHandler
 {
   public enum Status
   {
@@ -474,7 +474,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           }
 
           // if stop is requested or task's end sequence is set by call to setEndOffsets method with finish set to true
-          if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
+          if (stopRequested.get() || sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) {
             status = Status.PUBLISHING;
           }
 
@@ -504,10 +504,11 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           SequenceMetadata sequenceToCheckpoint = null;
           for (OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record : records) {
 
+
             // for Kafka, the end offsets are exclusive, so skip it
             if (isEndSequenceOffsetsExclusive() &&
                 createSequenceNumber(record.getSequenceNumber()).compareTo(
-                    createSequenceNumber(endOffsets.get(record.getPartitionId()))) == 0) {
+                    createSequenceNumber(endOffsets.get(record.getPartitionId()))) >= 0) {
               continue;
             }
 
@@ -530,17 +531,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
             } else if (createSequenceNumber(record.getSequenceNumber()).compareTo(
                 createSequenceNumber(endOffsets.get(record.getPartitionId()))) <= 0) {
 
-
-              if (!record.getSequenceNumber().equals(currOffsets.get(record.getPartitionId()))
-                  && !ioConfig.isSkipOffsetGaps()) {
-                throw new ISE(
-                    "WTF?! Got sequence[%s] after sequence[%s] in partition[%s].",
-                    record.getSequenceNumber(),
-                    currOffsets.get(record.getPartitionId()),
-                    record.getPartitionId()
-                );
-              }
-
               try {
                 final List<byte[]> valueBytess = record.getData();
                 final List<InputRow> rows;
@@ -1897,7 +1887,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   )
   {
     if (intialSequenceSnapshot.containsKey(record.getPartitionId())) {
-      if (!intialSequenceSnapshot.get(record.getPartitionId()).equals(record.getSequenceNumber())) {
+      if (record.getSequenceNumber().compareTo(intialSequenceSnapshot.get(record.getPartitionId())) < 0) {
         throw new ISE(
             "Starting sequenceNumber [%s] does not match expected [%s] for partition [%s]",
             record.getSequenceNumber(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
index 4dd653e..a122d9e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
@@ -35,7 +35,7 @@ import java.util.stream.Collectors;
  * @param <PartitionIdType>    partition id
  * @param <SequenceOffsetType> sequence number
  */
-public class OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType>
+public class OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType extends Comparable>
 {
   private final String stream;
   private final PartitionIdType partitionId;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
index d9e599d..3a6e87e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
@@ -36,7 +36,7 @@ import java.util.Set;
  * @param <SequenceOffsetType> Sequence Number Type
  */
 @Beta
-public interface RecordSupplier<PartitionIdType, SequenceOffsetType> extends Closeable
+public interface RecordSupplier<PartitionIdType, SequenceOffsetType extends Comparable> extends Closeable
 {
   /**
    * assigns the given partitions to this RecordSupplier
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 25250ac..4c6509d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -119,7 +119,7 @@ import java.util.stream.Stream;
  * @param <PartitionIdType>    the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type
  * @param <SequenceOffsetType> the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers
  */
-public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>
+public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType extends Comparable>
     implements Supervisor
 {
   public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 24c910c..6159021 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -32,7 +32,7 @@
     </parent>
 
     <properties>
-        <apache.kafka.version>0.10.2.2</apache.kafka.version>
+        <apache.kafka.version>2.1.0</apache.kafka.version>
     </properties>
 
     <dependencies>
@@ -53,17 +53,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.druid.extensions</groupId>
-            <artifactId>druid-kafka-eight</artifactId>
-            <version>${project.parent.version}</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>kafka_2.10</artifactId>
-                    <groupId>org.apache.kafka</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.druid.extensions</groupId>
             <artifactId>druid-histogram</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java
deleted file mode 100644
index 10f9aab..0000000
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import com.google.common.base.Throwables;
-import com.google.inject.Inject;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.commons.io.IOUtils;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.testing.IntegrationTestingConfig;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.utils.RetryUtil;
-import org.apache.druid.testing.utils.TestQueryHelper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeSuite;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-
-/*
- * This is a test for the kafka firehose.
- */
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaTest extends AbstractIndexerTest
-{
-  private static final Logger LOG = new Logger(ITKafkaTest.class);
-  private static final int DELAY_BETWEEN_EVENTS_SECS = 5;
-  private static final String INDEXER_FILE = "/indexer/kafka_index_task.json";
-  private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
-  private static final String DATASOURCE = "kafka_test";
-  private static final String TOPIC_NAME = "kafkaTopic";
-  private static final int MINUTES_TO_SEND = 2;
-  public static final String testPropertyPrefix = "kafka.test.property.";
-
-
-  // We'll fill in the current time and numbers for added, deleted and changed
-  // before sending the event.
-  final String event_template =
-      "{\"timestamp\": \"%s\"," +
-      "\"page\": \"Gypsy Danger\"," +
-      "\"language\" : \"en\"," +
-      "\"user\" : \"nuclear\"," +
-      "\"unpatrolled\" : \"true\"," +
-      "\"newPage\" : \"true\"," +
-      "\"robot\": \"false\"," +
-      "\"anonymous\": \"false\"," +
-      "\"namespace\":\"article\"," +
-      "\"continent\":\"North America\"," +
-      "\"country\":\"United States\"," +
-      "\"region\":\"Bay Area\"," +
-      "\"city\":\"San Francisco\"," +
-      "\"added\":%d," +
-      "\"deleted\":%d," +
-      "\"delta\":%d}";
-
-  private String taskID;
-  private ZkClient zkClient;
-  private ZkUtils zkUtils;
-  private boolean segmentsExist;   // to tell if we should remove segments during teardown
-
-  // format for the querying interval
-  private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
-  // format for the expected timestamp in a query response
-  private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
-  private DateTime dtFirst;                // timestamp of 1st event
-  private DateTime dtLast;                 // timestamp of last event
-
-  @Inject
-  private TestQueryHelper queryHelper;
-  @Inject
-  private IntegrationTestingConfig config;
-
-  private String fullDatasourceName;
-
-  @BeforeSuite
-  public void setFullDatasourceName()
-  {
-    fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix();
-  }
-  @Test
-  public void testKafka()
-  {
-    LOG.info("Starting test: ITKafkaTest");
-
-    // create topic
-    try {
-      int sessionTimeoutMs = 10000;
-      int connectionTimeoutMs = 10000;
-      String zkHosts = config.getZookeeperHosts();
-      zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
-      zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false);
-      if (config.manageKafkaTopic()) {
-        int numPartitions = 1;
-        int replicationFactor = 1;
-        Properties topicConfig = new Properties();
-        // addFilteredProperties(topicConfig);
-        AdminUtils.createTopic(
-            zkUtils,
-            TOPIC_NAME,
-            numPartitions,
-            replicationFactor,
-            topicConfig,
-            RackAwareMode.Disabled$.MODULE$
-        );
-      }
-    }
-    catch (Exception e) {
-      throw new ISE(e, "could not create kafka topic");
-    }
-
-    // set up kafka producer
-    Properties properties = new Properties();
-    addFilteredProperties(properties);
-    properties.put("bootstrap.servers", config.getKafkaHost());
-    LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost());
-    properties.put("acks", "all");
-    properties.put("retries", "3");
-
-    KafkaProducer<String, String> producer = new KafkaProducer<>(
-        properties,
-        new StringSerializer(),
-        new StringSerializer()
-    );
-
-    DateTimeZone zone = DateTimes.inferTzFromString("UTC");
-    // format for putting into events
-    DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
-
-    DateTime dt = new DateTime(zone); // timestamp to put on events
-    dtFirst = dt;            // timestamp of 1st event
-    dtLast = dt;             // timestamp of last event
-
-    // these are used to compute the expected aggregations
-    int added = 0;
-    int num_events = 10;
-
-    // send data to kafka
-    for (int i = 0; i < num_events; i++) {
-      added += i;
-      // construct the event to send
-      String event = StringUtils.format(
-          event_template,
-          event_fmt.print(dt), i, 0, i
-      );
-      LOG.info("sending event: [%s]", event);
-      try {
-        // Send event to kafka
-        producer.send(new ProducerRecord<String, String>(TOPIC_NAME, event)).get();
-      }
-      catch (Exception ioe) {
-        throw Throwables.propagate(ioe);
-      }
-
-      dtLast = dt;
-      dt = new DateTime(zone);
-    }
-
-    producer.close();
-
-    String indexerSpec;
-
-    // replace temp strings in indexer file
-    try {
-      LOG.info("indexerFile name: [%s]", INDEXER_FILE);
-
-      Properties consumerProperties = new Properties();
-      consumerProperties.put("zookeeper.connect", config.getZookeeperInternalHosts());
-      consumerProperties.put("zookeeper.connection.timeout.ms", "15000");
-      consumerProperties.put("zookeeper.sync.time.ms", "5000");
-      consumerProperties.put("group.id", Long.toString(System.currentTimeMillis()));
-      consumerProperties.put("fetch.message.max.bytes", "1048586");
-      consumerProperties.put("auto.offset.reset", "smallest");
-      consumerProperties.put("auto.commit.enable", "false");
-
-      addFilteredProperties(consumerProperties);
-
-      indexerSpec = getTaskAsString(INDEXER_FILE);
-      indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", fullDatasourceName);
-      indexerSpec = StringUtils.replace(indexerSpec, "%%TOPIC%%", TOPIC_NAME);
-      indexerSpec = StringUtils.replace(indexerSpec, "%%COUNT%%", Integer.toString(num_events));
-      String consumerPropertiesJson = jsonMapper.writeValueAsString(consumerProperties);
-      indexerSpec = StringUtils.replace(indexerSpec, "%%CONSUMER_PROPERTIES%%", consumerPropertiesJson);
-
-      LOG.info("indexerFile: [%s]\n", indexerSpec);
-    }
-    catch (Exception e) {
-      // log here so the message will appear in the console output
-      LOG.error("could not read indexer file [%s]", INDEXER_FILE);
-      throw new ISE(e, "could not read indexer file [%s]", INDEXER_FILE);
-    }
-
-    // start indexing task
-    taskID = indexer.submitTask(indexerSpec);
-    LOG.info("-------------SUBMITTED TASK");
-
-    // wait for the task to finish
-    indexer.waitUntilTaskCompletes(taskID, 10000, 60);
-
-    // wait for segments to be handed off
-    try {
-      RetryUtil.retryUntil(
-          new Callable<Boolean>()
-          {
-            @Override
-            public Boolean call()
-            {
-              return coordinator.areSegmentsLoaded(fullDatasourceName);
-            }
-          },
-          true,
-          10000,
-          30,
-          "Real-time generated segments loaded"
-      );
-    }
-    catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-    LOG.info("segments are present");
-    segmentsExist = true;
-
-    // put the timestamps into the query structure
-    String queryResponseTemplate;
-    InputStream is = ITKafkaTest.class.getResourceAsStream(QUERIES_FILE);
-    if (null == is) {
-      throw new ISE("could not open query file: %s", QUERIES_FILE);
-    }
-
-    try {
-      queryResponseTemplate = IOUtils.toString(is, "UTF-8");
-    }
-    catch (IOException e) {
-      throw new ISE(e, "could not read query file: %s", QUERIES_FILE);
-    }
-
-    String queryStr = queryResponseTemplate;
-    queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName);
-    // time boundary
-    queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
-    queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast));
-    queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst));
-    // time series
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst));
-    String queryEnd = INTERVAL_FMT.print(dtFirst.plusMinutes(MINUTES_TO_SEND + 2));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_END%%", queryEnd);
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_ADDED%%", Integer.toString(added));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events));
-
-    // this query will probably be answered from the realtime task
-    try {
-      this.queryHelper.testQueriesFromString(queryStr, 2);
-    }
-    catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @AfterClass
-  public void afterClass()
-  {
-    LOG.info("teardown");
-    if (config.manageKafkaTopic()) {
-      // delete kafka topic
-      AdminUtils.deleteTopic(zkUtils, TOPIC_NAME);
-    }
-
-    // remove segments
-    if (segmentsExist) {
-      unloadAndKillData(fullDatasourceName);
-    }
-  }
-
-  public void addFilteredProperties(Properties properties)
-  {
-    for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
-      if (entry.getKey().startsWith(testPropertyPrefix)) {
-        properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue());
-      }
-    }
-  }
-}
-
diff --git a/integration-tests/src/test/resources/indexer/kafka_index_task.json b/integration-tests/src/test/resources/indexer/kafka_index_task.json
deleted file mode 100644
index 55e28c7..0000000
--- a/integration-tests/src/test/resources/indexer/kafka_index_task.json
+++ /dev/null
@@ -1,68 +0,0 @@
-{
-  "type" : "index_realtime",
-  "spec" : {
-    "dataSchema": {
-      "dataSource": "%%DATASOURCE%%",
-      "parser" : {
-        "type" : "string",
-        "parseSpec" : {
-          "format" : "json",
-          "timestampSpec" : {
-            "column" : "timestamp",
-            "format" : "auto"
-          },
-          "dimensionsSpec" : {
-            "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
-            "dimensionExclusions" : [],
-            "spatialDimensions" : []
-          }
-        }
-      },
-      "metricsSpec": [
-        {
-          "type": "count",
-          "name": "count"
-        },
-        {
-          "type": "doubleSum",
-          "name": "added",
-          "fieldName": "added"
-        },
-        {
-          "type": "doubleSum",
-          "name": "deleted",
-          "fieldName": "deleted"
-        },
-        {
-          "type": "doubleSum",
-          "name": "delta",
-          "fieldName": "delta"
-        }
-      ],
-      "granularitySpec": {
-        "type" : "uniform",
-	"segmentGranularity": "MINUTE",
-        "queryGranularity": "NONE"
-      }
-    },
-    "ioConfig" : {
-      "type" : "realtime",
-      "firehose": {
-        "type": "fixedCount",
-        "count": "%%COUNT%%",
-        "delegate": {
-          "type": "kafka-0.8",
-	    "consumerProps": %%CONSUMER_PROPERTIES%%,
-          "feed": "%%TOPIC%%"
-        }
-      }
-    },
-    "tuningConfig": {
-      "type" : "realtime",
-      "maxRowsInMemory": 500000,
-      "intermediatePersistPeriod": "PT3M",
-      "windowPeriod": "PT150S",
-      "basePersistDirectory": "/home/y/var/druid_state/kafka_test/realtime/basePersist"
-    }
-  }
-}
diff --git a/pom.xml b/pom.xml
index 351e735..95ca07b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -696,7 +696,7 @@
             <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>
-                <version>1.6.4</version>
+                <version>1.7.25</version>
             </dependency>
             <dependency>
                 <groupId>org.roaringbitmap</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org