You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/03/13 21:29:46 UTC

[incubator-druid] branch master updated: fix SequenceMetadata deserialization (#7256)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fb1489d  fix SequenceMetadata deserialization (#7256)
fb1489d is described below

commit fb1489d31380106bf4002a1df816fef7dc99c05b
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Wed Mar 13 14:29:39 2019 -0700

    fix SequenceMetadata deserialization (#7256)
    
    * wip
    
    * fix tests, stop reading if we are at end offset
    
    * fix build
    
    * remove restore at end offsets fix in favor of a separate PR
    
    * use typereference from method for serialization too
---
 .../IncrementalPublishingKafkaIndexTaskRunner.java |  11 +-
 .../indexing/kafka/LegacyKafkaIndexTaskRunner.java |  12 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 181 ++++++++---
 .../indexing/kinesis/KinesisIndexTaskRunner.java   |  11 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     | 247 +++++++++++---
 .../SeekableStreamIndexTaskRunner.java             | 360 ++-------------------
 .../indexing/seekablestream/SequenceMetadata.java  | 335 +++++++++++++++++++
 .../supervisor/SeekableStreamSupervisor.java       |   3 +-
 8 files changed, 754 insertions(+), 406 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 6424c29..ae092d5 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
+import org.apache.druid.indexing.seekablestream.SequenceMetadata;
 import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -111,7 +112,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
   }
 
   @Override
-  protected SeekableStreamPartitions<Integer, Long> deserializeSeekableStreamPartitionsFromMetadata(
+  protected SeekableStreamPartitions<Integer, Long> deserializePartitionsFromMetadata(
       ObjectMapper mapper,
       Object object
   )
@@ -225,6 +226,14 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
     return false;
   }
 
+  @Override
+  public TypeReference<List<SequenceMetadata<Integer, Long>>> getSequenceMetadataTypeReference()
+  {
+    return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
+    {
+    };
+  }
+
   @Nullable
   @Override
   protected TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index c822088..1082473 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.kafka;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -50,6 +51,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
+import org.apache.druid.indexing.seekablestream.SequenceMetadata;
 import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -610,6 +612,14 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
     return false;
   }
 
+  @Override
+  public TypeReference<List<SequenceMetadata<Integer, Long>>> getSequenceMetadataTypeReference()
+  {
+    return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
+    {
+    };
+  }
+
   @Nonnull
   @Override
   protected List<OrderedPartitionableRecord<Integer, Long>> getRecords(
@@ -709,7 +719,7 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
 
 
   @Override
-  protected SeekableStreamPartitions<Integer, Long> deserializeSeekableStreamPartitionsFromMetadata(
+  protected SeekableStreamPartitions<Integer, Long> deserializePartitionsFromMetadata(
       ObjectMapper mapper,
       Object object
   )
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index a0caebc..070396f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -293,6 +293,26 @@ public class KafkaIndexTaskTest
     );
   }
 
+  private static List<ProducerRecord<byte[], byte[]>> generateSinglePartitionRecords(String topic)
+  {
+    return ImmutableList.of(
+        new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0"))
+    );
+  }
+
   private static String getTopicName()
   {
     return "topic" + topicPostfix++;
@@ -694,7 +714,7 @@ public class KafkaIndexTaskTest
     }
     final Map<Integer, Long> nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
 
-   
+
     Assert.assertTrue(checkpoint2.getPartitionSequenceNumberMap().equals(nextOffsets));
     task.getRunner().setEndOffsets(nextOffsets, false);
 
@@ -726,7 +746,7 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed());
     Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable());
     Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
-    
+
     // Check published metadata
     SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
     SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
@@ -739,7 +759,7 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(
           new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
           metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
-    
+
     Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
     Assert.assertEquals(
         new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
@@ -867,23 +887,7 @@ public class KafkaIndexTaskTest
     if (!isIncrementalHandoffSupported) {
       return;
     }
-
-    List<ProducerRecord<byte[], byte[]>> records = ImmutableList.of(
-        new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0"))
-    );
+    records = generateSinglePartitionRecords(topic);
 
     final String baseSequenceName = "sequence0";
     // as soon as any segment has more than one record, incremental publishing should happen
@@ -901,22 +905,14 @@ public class KafkaIndexTaskTest
     Map<String, Object> consumerProps = kafkaServer.consumerProperties();
     consumerProps.put("max.poll.records", "1");
 
-    final SeekableStreamPartitions<Integer, Long> startPartitions = new SeekableStreamPartitions<>(
-        topic,
-        ImmutableMap.of(0, 0L)
-    );
-    final SeekableStreamPartitions<Integer, Long> checkpoint1 = new SeekableStreamPartitions<>(
-        topic,
-        ImmutableMap.of(0, 5L)
-    );
-    final SeekableStreamPartitions<Integer, Long> checkpoint2 = new SeekableStreamPartitions<>(
-        topic,
-        ImmutableMap.of(0, 12L)
-    );
-    final SeekableStreamPartitions<Integer, Long> endPartitions = new SeekableStreamPartitions<>(
-        topic,
-        ImmutableMap.of(0, Long.MAX_VALUE)
-    );
+    final SeekableStreamPartitions<Integer, Long> startPartitions =
+        new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L));
+    final SeekableStreamPartitions<Integer, Long> checkpoint1 =
+        new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L));
+    final SeekableStreamPartitions<Integer, Long> checkpoint2 =
+        new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 12L));
+    final SeekableStreamPartitions<Integer, Long> endPartitions =
+        new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, Long.MAX_VALUE));
 
     final KafkaIndexTask task = createTask(
         null,
@@ -1860,6 +1856,119 @@ public class KafkaIndexTaskTest
   }
 
   @Test(timeout = 60_000L)
+  public void testRestoreAfterPersistingSequences() throws Exception
+  {
+    if (!isIncrementalHandoffSupported) {
+      return;
+    }
+
+    records = generateSinglePartitionRecords(topic);
+    maxRowsPerSegment = 2;
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final KafkaIndexTask task1 = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            "sequence0",
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null
+        )
+    );
+
+    final SeekableStreamPartitions<Integer, Long> checkpoint = new SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(0, 5L)
+    );
+
+    final ListenableFuture<TaskStatus> future1 = runTask(task1);
+
+    // Insert some data, but not enough for the task to finish
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 5)) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.commitTransaction();
+    }
+
+    while (task1.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets());
+    Assert.assertEquals(checkpoint.getPartitionSequenceNumberMap(), currentOffsets);
+    // Set endOffsets to persist sequences
+    task1.getRunner().setEndOffsets(ImmutableMap.of(0, 5L), false);
+
+    // Stop without publishing segment
+    task1.stopGracefully(toolboxFactory.build(task1).getConfig());
+    unlockAppenderatorBasePersistDirForTask(task1);
+
+    Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
+
+    // Start a new task
+    final KafkaIndexTask task2 = createTask(
+        task1.getId(),
+        new KafkaIndexTaskIOConfig(
+            0,
+            "sequence0",
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null
+        )
+    );
+
+    final ListenableFuture<TaskStatus> future2 = runTask(task2);
+    // Wait for the task to start reading
+
+    // Insert remaining data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 5)) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.commitTransaction();
+    }
+
+    // Wait for task to exit
+    Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
+
+    // Check metrics
+    Assert.assertEquals(5, task1.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway());
+    Assert.assertEquals(4, task2.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
+
+    // Check published segments & metadata
+    SegmentDescriptor desc1 = sd(task1, "2008/P1D", 0);
+    SegmentDescriptor desc2 = sd(task1, "2008/P1D", 1);
+    SegmentDescriptor desc3 = sd(task1, "2009/P1D", 0);
+    SegmentDescriptor desc4 = sd(task1, "2009/P1D", 1);
+    SegmentDescriptor desc5 = sd(task1, "2010/P1D", 0);
+    SegmentDescriptor desc6 = sd(task1, "2011/P1D", 0);
+    SegmentDescriptor desc7 = sd(task1, "2012/P1D", 0);
+    Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
+    Assert.assertEquals(
+        new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L))),
+        metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
+    );
+  }
+
+  @Test(timeout = 60_000L)
   public void testRunWithPauseAndResume() throws Exception
   {
     final KafkaIndexTask task = createTask(
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index 3e7e5e7..50326f7 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
+import org.apache.druid.indexing.seekablestream.SequenceMetadata;
 import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -92,7 +93,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
   }
 
   @Override
-  protected SeekableStreamPartitions<String, String> deserializeSeekableStreamPartitionsFromMetadata(
+  protected SeekableStreamPartitions<String, String> deserializePartitionsFromMetadata(
       ObjectMapper mapper,
       Object object
   )
@@ -176,6 +177,14 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
     return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(seqNum);
   }
 
+  @Override
+  public TypeReference<List<SequenceMetadata<String, String>>> getSequenceMetadataTypeReference()
+  {
+    return new TypeReference<List<SequenceMetadata<String, String>>>()
+    {
+    };
+  }
+
   @Nullable
   @Override
   protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index d4c75a2..857b4a7 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -198,28 +198,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
   private static String shardId1 = "1";
   private static String shardId0 = "0";
   private static KinesisRecordSupplier recordSupplier;
-  private static List<OrderedPartitionableRecord<String, String>> records = ImmutableList.of(
-      new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(
-          stream,
-          "1",
-          "5",
-          jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
-      ),
-      new OrderedPartitionableRecord<>(stream, "1", "6", Collections.singletonList(StringUtils.toUtf8("unparseable"))),
-      new OrderedPartitionableRecord<>(stream, "1", "7", Collections.singletonList(StringUtils.toUtf8("unparseable2"))),
-      new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))),
-      new OrderedPartitionableRecord<>(stream, "1", "9", jb("2013", "f", "y", "10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "10", jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "11", jb("2049", "f", "y", "10", "notanumber", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "12", jb("2049", "f", "y", "10", "20.0", "notanumber")),
-      new OrderedPartitionableRecord<>(stream, "0", "0", jb("2012", "g", "y", "10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "0", "1", jb("2011", "h", "y", "10", "20.0", "1.0"))
-  );
+  private static List<OrderedPartitionableRecord<String, String>> records;
 
   private static ServiceEmitter emitter;
   private static ListeningExecutorService taskExec;
@@ -315,6 +294,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     maxSavedParseExceptions = null;
     skipAvailabilityCheck = false;
     doHandoff = true;
+    records = generateRecords(stream);
     reportsFile = File.createTempFile("KinesisIndexTaskTestReports-" + System.currentTimeMillis(), "json");
     maxRecordsPerPoll = 1;
 
@@ -347,6 +327,52 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     emitter.close();
   }
 
+  private static List<OrderedPartitionableRecord<String, String>> generateRecords(String stream)
+  {
+    return ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "5",
+            jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+        ),
+        new OrderedPartitionableRecord<>(stream, "1", "6", Collections.singletonList(StringUtils.toUtf8("unparseable"))),
+        new OrderedPartitionableRecord<>(stream, "1", "7", Collections.singletonList(StringUtils.toUtf8("unparseable2"))),
+        new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))),
+        new OrderedPartitionableRecord<>(stream, "1", "9", jb("2013", "f", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "10", jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "11", jb("2049", "f", "y", "10", "notanumber", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "12", jb("2049", "f", "y", "10", "20.0", "notanumber")),
+        new OrderedPartitionableRecord<>(stream, "0", "0", jb("2012", "g", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "0", "1", jb("2011", "h", "y", "10", "20.0", "1.0"))
+    );
+  }
+
+  private static List<OrderedPartitionableRecord<String, String>> generateSinglePartitionRecords(String stream)
+  {
+    return ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "14", jb("2013", "e", "y", "10", "20.0", "1.0"))
+    );
+  }
   @Test(timeout = 120_000L)
   public void testRunAfterDataInserted() throws Exception
   {
@@ -2213,6 +2239,165 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
   }
 
+  @Test(timeout = 120_000L)
+  public void testRestoreAfterPersistingSequences() throws Exception
+  {
+    maxRowsPerSegment = 2;
+    maxRecordsPerPoll = 1;
+    records = generateSinglePartitionRecords(stream);
+
+    recordSupplier.assign(anyObject());
+    expectLastCall().anyTimes();
+
+    expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+
+    recordSupplier.seek(anyObject(), anyString());
+    expectLastCall().anyTimes();
+
+    // simulate 1 record at a time
+    expect(recordSupplier.poll(anyLong())).andReturn(Collections.singletonList(records.get(0)))
+                                          .once()
+                                          .andReturn(Collections.singletonList(records.get(1)))
+                                          .once()
+                                          .andReturn(Collections.singletonList(records.get(2)))
+                                          .once()
+                                          .andReturn(Collections.singletonList(records.get(3)))
+                                          .once()
+                                          .andReturn(Collections.singletonList(records.get(4)))
+                                          .once()
+                                          .andReturn(Collections.emptyList())
+                                          .anyTimes();
+
+    replayAll();
+
+    final KinesisIndexTask task1 = createTask(
+        "task1",
+        new KinesisIndexTaskIOConfig(
+            null,
+            "sequence0",
+            new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+                shardId1,
+                "0"
+            )),
+            new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+                shardId1,
+                "6"
+            )),
+            true,
+            null,
+            null,
+            "awsEndpoint",
+            null,
+            null,
+            null,
+            null,
+            null,
+            false
+        )
+    );
+
+    final SeekableStreamPartitions<String, String> checkpoint1 = new SeekableStreamPartitions<>(
+        stream,
+        ImmutableMap.of(shardId1, "4")
+    );
+
+    final ListenableFuture<TaskStatus> future1 = runTask(task1);
+
+    while (task1.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    final Map<String, String> currentOffsets = ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets());
+    Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
+    task1.getRunner().setEndOffsets(currentOffsets, false);
+
+    // Stop without publishing segment
+    task1.stopGracefully(toolboxFactory.build(task1).getConfig());
+    unlockAppenderatorBasePersistDirForTask(task1);
+
+    Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
+
+    verifyAll();
+    reset(recordSupplier);
+
+    recordSupplier.assign(anyObject());
+    expectLastCall().anyTimes();
+
+    expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+
+
+    recordSupplier.seek(anyObject(), anyString());
+    expectLastCall().anyTimes();
+
+    expect(recordSupplier.poll(anyLong())).andReturn(Collections.singletonList(records.get(5)))
+                                          .once()
+                                          .andReturn(Collections.singletonList(records.get(6)))
+                                          .once()
+                                          .andReturn(Collections.emptyList())
+                                          .anyTimes();
+
+    recordSupplier.close();
+    expectLastCall();
+
+    replayAll();
+
+    // Start a new task
+    final KinesisIndexTask task2 = createTask(
+        task1.getId(),
+        new KinesisIndexTaskIOConfig(
+            null,
+            "sequence0",
+            new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+                shardId1,
+                "0"
+            )),
+            new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+                shardId1,
+                "6"
+            )),
+            true,
+            null,
+            null,
+            "awsEndpoint",
+            null,
+            null,
+            ImmutableSet.of(shardId1),
+            null,
+            null,
+            false
+        )
+    );
+
+    final ListenableFuture<TaskStatus> future2 = runTask(task2);
+
+    // Wait for task to exit
+    Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
+
+    verifyAll();
+
+    // Check metrics
+    Assert.assertEquals(5, task1.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway());
+    Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
+
+    // Check published segments & metadata
+    SegmentDescriptor desc1 = sd(task1, "2008/P1D", 0);
+    SegmentDescriptor desc2 = sd(task1, "2009/P1D", 0);
+    SegmentDescriptor desc3 = sd(task1, "2010/P1D", 0);
+    SegmentDescriptor desc4 = sd(task1, "2011/P1D", 0);
+    SegmentDescriptor desc5 = sd(task1, "2013/P1D", 0);
+    Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5), publishedDescriptors());
+    Assert.assertEquals(
+        new KinesisDataSourceMetadata(
+            new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+                shardId1,
+                "6"
+            ))),
+        metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
+    );
+  }
 
   @Test(timeout = 120_000L)
   public void testRunWithPauseAndResume() throws Exception
@@ -2426,23 +2611,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
   @Test(timeout = 5000L)
   public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
   {
-    final List<OrderedPartitionableRecord<String, String>> records = ImmutableList.of(
-        new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "14", jb("2013", "e", "y", "10", "20.0", "1.0"))
-    );
+    records = generateSinglePartitionRecords(stream);
 
     final String baseSequenceName = "sequence0";
     // as soon as any segment has more than one record, incremental publishing should happen
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 367b201..4fca19d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -20,8 +20,6 @@
 package org.apache.druid.indexing.seekablestream;
 
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -53,7 +51,6 @@ import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
 import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
-import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
@@ -75,7 +72,6 @@ import org.apache.druid.segment.realtime.appenderator.Appenderator;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
 import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
 import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
-import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.Access;
@@ -142,8 +138,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   }
 
   private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskRunner.class);
-  private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
-  private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
+  static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
+  static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
 
   private final Map<PartitionIdType, SequenceOffsetType> endOffsets;
   private final ConcurrentMap<PartitionIdType, SequenceOffsetType> currOffsets = new ConcurrentHashMap<>();
@@ -210,7 +206,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   protected volatile boolean pauseRequested = false;
   private volatile long nextCheckpointTime;
 
-  private volatile CopyOnWriteArrayList<SequenceMetadata> sequences;
+  private volatile CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequences;
   private volatile Throwable backgroundThreadException;
 
   public SeekableStreamIndexTaskRunner(
@@ -276,7 +272,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> previous = sequenceOffsets.next();
         while (sequenceOffsets.hasNext()) {
           Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> current = sequenceOffsets.next();
-          sequences.add(new SequenceMetadata(
+          sequences.add(new SequenceMetadata<>(
               previous.getKey(),
               StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
               previous.getValue(),
@@ -287,7 +283,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           previous = current;
           exclusive = true;
         }
-        sequences.add(new SequenceMetadata(
+        sequences.add(new SequenceMetadata<>(
             previous.getKey(),
             StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
             previous.getValue(),
@@ -296,7 +292,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
             exclusive ? previous.getValue().keySet() : null
         ));
       } else {
-        sequences.add(new SequenceMetadata(
+        sequences.add(new SequenceMetadata<>(
             0,
             StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
             ioConfig.getStartPartitions().getPartitionSequenceNumberMap(),
@@ -369,7 +365,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       } else {
         @SuppressWarnings("unchecked")
         final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
-        final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> restoredNextPartitions = deserializeSeekableStreamPartitionsFromMetadata(
+        final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> restoredNextPartitions = deserializePartitionsFromMetadata(
             toolbox.getObjectMapper(),
             restoredMetadataMap.get(METADATA_NEXT_PARTITIONS)
         );
@@ -543,9 +539,9 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                 }
                 boolean isPersistRequired = false;
 
-                final SequenceMetadata sequenceToUse = sequences
+                final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceToUse = sequences
                     .stream()
-                    .filter(sequenceMetadata -> sequenceMetadata.canHandle(record))
+                    .filter(sequenceMetadata -> sequenceMetadata.canHandle(this, record))
                     .findFirst()
                     .orElse(null);
 
@@ -692,11 +688,11 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         status = Status.PUBLISHING;
       }
 
-      for (SequenceMetadata sequenceMetadata : sequences) {
+      for (SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata : sequences) {
         if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) {
           // this is done to prevent checks in sequence specific commit supplier from failing
           sequenceMetadata.setEndOffsets(currOffsets);
-          sequenceMetadata.updateAssignments(currOffsets);
+          sequenceMetadata.updateAssignments(this, currOffsets);
           publishingSequences.add(sequenceMetadata.getSequenceName());
           // persist already done in finally, so directly add to publishQueue
           publishAndRegisterHandoff(sequenceMetadata);
@@ -812,11 +808,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return TaskStatus.success(task.getId());
   }
 
-  /**
-   * checks if the input seqNum marks end of shard. Used by Kinesis only
-   */
-  protected abstract boolean isEndOfShard(SequenceOffsetType seqNum);
-
   private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException
   {
     // Check if any publishFuture failed.
@@ -846,14 +837,14 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     handOffWaitList.removeAll(handoffFinished);
   }
 
-  private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata)
+  private void publishAndRegisterHandoff(SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata)
   {
     log.info("Publishing segments for sequence [%s]", sequenceMetadata);
 
     final ListenableFuture<SegmentsAndMetadata> publishFuture = Futures.transform(
         driver.publish(
-            sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()),
-            sequenceMetadata.getCommitterSupplier(stream, lastPersistedOffsets).get(),
+            sequenceMetadata.createPublisher(this, toolbox, ioConfig.isUseTransaction()),
+            sequenceMetadata.getCommitterSupplier(this, stream, lastPersistedOffsets).get(),
             Collections.singletonList(sequenceMetadata.getSequenceName())
         ),
         (Function<SegmentsAndMetadata, SegmentsAndMetadata>) publishedSegmentsAndMetadata -> {
@@ -938,11 +929,9 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     final File sequencesPersistFile = getSequencesPersistFile(toolbox);
     if (sequencesPersistFile.exists()) {
       sequences = new CopyOnWriteArrayList<>(
-          toolbox.getObjectMapper().<List<SequenceMetadata>>readValue(
+          toolbox.getObjectMapper().<List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>>readValue(
               sequencesPersistFile,
-              new TypeReference<List<SequenceMetadata>>()
-              {
-              }
+              getSequenceMetadataTypeReference()
           )
       );
       return true;
@@ -955,9 +944,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   {
     log.info("Persisting Sequences Metadata [%s]", sequences);
     toolbox.getObjectMapper().writerWithType(
-        new TypeReference<List<SequenceMetadata>>()
-        {
-        }
+        getSequenceMetadataTypeReference()
     ).writeValue(getSequencesPersistFile(toolbox), sequences);
   }
 
@@ -1002,8 +989,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   private void maybePersistAndPublishSequences(Supplier<Committer> committerSupplier)
       throws InterruptedException
   {
-    for (SequenceMetadata sequenceMetadata : sequences) {
-      sequenceMetadata.updateAssignments(currOffsets);
+    for (SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata : sequences) {
+      sequenceMetadata.updateAssignments(this, currOffsets);
       if (!sequenceMetadata.isOpen() && !publishingSequences.contains(sequenceMetadata.getSequenceName())) {
         publishingSequences.add(sequenceMetadata.getSequenceName());
         try {
@@ -1378,7 +1365,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         // and after acquiring pauseLock to correctly guard against duplicate requests
         Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No Sequences found to set end sequences");
 
-        final SequenceMetadata latestSequence = sequences.get(sequences.size() - 1);
+        final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence = sequences.get(sequences.size() - 1);
         // if a partition has not been read yet (contained in initialOffsetsSnapshot), then
         // do not mark the starting sequence number as exclusive
         Set<PartitionIdType> exclusivePartitions = sequenceNumbers.keySet()
@@ -1389,7 +1376,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                                                                   .collect(Collectors.toSet());
 
         if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
-             && latestSequence.exclusiveStartPartitions.equals(exclusivePartitions)
+             && latestSequence.getExclusiveStartPartitions().equals(exclusivePartitions)
              && !finish)
             || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) {
           log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers);
@@ -1409,8 +1396,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         }
 
         for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : sequenceNumbers.entrySet()) {
-          if (createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey())))
-              < 0) {
+          if (createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey()))) < 0) {
             return Response.status(Response.Status.BAD_REQUEST)
                            .entity(
                                StringUtils.format(
@@ -1433,7 +1419,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           exclusiveStartingPartitions.addAll(exclusivePartitions);
 
           // create new sequence
-          final SequenceMetadata newSequence = new SequenceMetadata(
+          final SequenceMetadata<PartitionIdType, SequenceOffsetType> newSequence = new SequenceMetadata<>(
               latestSequence.getSequenceId() + 1,
               StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1),
               sequenceNumbers,
@@ -1596,291 +1582,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return startTime;
   }
 
-  private class SequenceMetadata
-  {
-    private final int sequenceId;
-    private final String sequenceName;
-    private final Set<PartitionIdType> exclusiveStartPartitions;
-    private final Set<PartitionIdType> assignments;
-    private final boolean sentinel;
-    private boolean checkpointed;
-    /**
-     * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because
-     * {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread.
-     */
-    private final ReentrantLock lock = new ReentrantLock();
-
-    final Map<PartitionIdType, SequenceOffsetType> startOffsets;
-    final Map<PartitionIdType, SequenceOffsetType> endOffsets;
-
-    @JsonCreator
-    public SequenceMetadata(
-        @JsonProperty("sequenceId") int sequenceId,
-        @JsonProperty("sequenceName") String sequenceName,
-        @JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType> startOffsets,
-        @JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType> endOffsets,
-        @JsonProperty("checkpointed") boolean checkpointed,
-        @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> exclusiveStartPartitions
-    )
-    {
-      Preconditions.checkNotNull(sequenceName);
-      Preconditions.checkNotNull(startOffsets);
-      Preconditions.checkNotNull(endOffsets);
-      this.sequenceId = sequenceId;
-      this.sequenceName = sequenceName;
-      this.startOffsets = ImmutableMap.copyOf(startOffsets);
-      this.endOffsets = new HashMap<>(endOffsets);
-      this.assignments = new HashSet<>(startOffsets.keySet());
-      this.checkpointed = checkpointed;
-      this.sentinel = false;
-      this.exclusiveStartPartitions = exclusiveStartPartitions == null
-                                      ? Collections.emptySet()
-                                      : exclusiveStartPartitions;
-    }
-
-    @JsonProperty
-    public Set<PartitionIdType> getExclusiveStartPartitions()
-    {
-      return exclusiveStartPartitions;
-    }
-
-    @JsonProperty
-    public int getSequenceId()
-    {
-      return sequenceId;
-    }
-
-    @JsonProperty
-    public boolean isCheckpointed()
-    {
-      lock.lock();
-      try {
-        return checkpointed;
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    @JsonProperty
-    public String getSequenceName()
-    {
-      return sequenceName;
-    }
-
-    @JsonProperty
-    public Map<PartitionIdType, SequenceOffsetType> getStartOffsets()
-    {
-      return startOffsets;
-    }
-
-    @JsonProperty
-    public Map<PartitionIdType, SequenceOffsetType> getEndOffsets()
-    {
-      lock.lock();
-      try {
-        return endOffsets;
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    @JsonProperty
-    public boolean isSentinel()
-    {
-      return sentinel;
-    }
-
-    void setEndOffsets(Map<PartitionIdType, SequenceOffsetType> newEndOffsets)
-    {
-      lock.lock();
-      try {
-        endOffsets.putAll(newEndOffsets);
-        checkpointed = true;
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    void updateAssignments(Map<PartitionIdType, SequenceOffsetType> nextPartitionOffset)
-    {
-      lock.lock();
-      try {
-        assignments.clear();
-        nextPartitionOffset.forEach((key, value) -> {
-          if (endOffsets.get(key).equals(SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER)
-              || createSequenceNumber(endOffsets.get(key)).compareTo(createSequenceNumber(nextPartitionOffset.get(key)))
-                 > 0) {
-            assignments.add(key);
-          }
-        });
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    boolean isOpen()
-    {
-      return !assignments.isEmpty();
-    }
-
-    boolean canHandle(OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record)
-    {
-      lock.lock();
-      try {
-        final OrderedSequenceNumber<SequenceOffsetType> partitionEndOffset = createSequenceNumber(endOffsets.get(record.getPartitionId()));
-        final OrderedSequenceNumber<SequenceOffsetType> partitionStartOffset = createSequenceNumber(startOffsets.get(
-            record.getPartitionId()));
-        final OrderedSequenceNumber<SequenceOffsetType> recordOffset = createSequenceNumber(record.getSequenceNumber());
-        if (!isOpen() || recordOffset == null || partitionEndOffset == null || partitionStartOffset == null) {
-          return false;
-        }
-        boolean ret;
-        if (isStartingSequenceOffsetsExclusive()) {
-          ret = recordOffset.compareTo(partitionStartOffset)
-                >= (getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0);
-        } else {
-          ret = recordOffset.compareTo(partitionStartOffset) >= 0;
-        }
-
-        if (isEndSequenceOffsetsExclusive()) {
-          ret &= recordOffset.compareTo(partitionEndOffset) < 0;
-        } else {
-          ret &= recordOffset.compareTo(partitionEndOffset) <= 0;
-        }
-
-        return ret;
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    @Override
-    public String toString()
-    {
-      lock.lock();
-      try {
-        return "SequenceMetadata{" +
-               "sequenceName='" + sequenceName + '\'' +
-               ", sequenceId=" + sequenceId +
-               ", startOffsets=" + startOffsets +
-               ", endOffsets=" + endOffsets +
-               ", assignments=" + assignments +
-               ", sentinel=" + sentinel +
-               ", checkpointed=" + checkpointed +
-               '}';
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    Supplier<Committer> getCommitterSupplier(
-        String stream,
-        Map<PartitionIdType, SequenceOffsetType> lastPersistedOffsets
-    )
-    {
-      // Set up committer.
-      return () ->
-          new Committer()
-          {
-            @Override
-            public Object getMetadata()
-            {
-              lock.lock();
-
-              try {
-                Preconditions.checkState(
-                    assignments.isEmpty(),
-                    "This committer can be used only once all the records till sequences [%s] have been consumed, also make"
-                    + " sure to call updateAssignments before using this committer",
-                    endOffsets
-                );
-
-
-                // merge endOffsets for this sequence with globally lastPersistedOffsets
-                // This is done because this committer would be persisting only sub set of segments
-                // corresponding to the current sequence. Generally, lastPersistedOffsets should already
-                // cover endOffsets but just to be sure take max of sequences and persist that
-                for (Map.Entry<PartitionIdType, SequenceOffsetType> partitionOffset : endOffsets.entrySet()) {
-                  SequenceOffsetType newOffsets = partitionOffset.getValue();
-                  if (lastPersistedOffsets.containsKey(partitionOffset.getKey()) &&
-                      createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey())).compareTo(
-                          createSequenceNumber(newOffsets)) > 0) {
-                    newOffsets = lastPersistedOffsets.get(partitionOffset.getKey());
-                  }
-                  lastPersistedOffsets.put(
-                      partitionOffset.getKey(),
-                      newOffsets
-                  );
-                }
-
-                // Publish metadata can be different from persist metadata as we are going to publish only
-                // subset of segments
-                return ImmutableMap.of(
-                    METADATA_NEXT_PARTITIONS, new SeekableStreamPartitions<>(stream, lastPersistedOffsets),
-                    METADATA_PUBLISH_PARTITIONS, new SeekableStreamPartitions<>(stream, endOffsets)
-                );
-              }
-              finally {
-                lock.unlock();
-              }
-            }
-
-            @Override
-            public void run()
-            {
-              // Do nothing.
-            }
-          };
-
-    }
-
-    TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTransaction)
-    {
-      return (segments, commitMetadata) -> {
-        final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> finalPartitions = deserializeSeekableStreamPartitionsFromMetadata(
-            toolbox.getObjectMapper(),
-            ((Map) Preconditions
-                .checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_PUBLISH_PARTITIONS)
-        );
-
-        // Sanity check, we should only be publishing things that match our desired end state.
-        if (!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) {
-          throw new ISE(
-              "WTF?! Driver for sequence [%s], attempted to publish invalid metadata[%s].",
-              toString(),
-              commitMetadata
-          );
-        }
-
-        final SegmentTransactionalInsertAction action;
-
-        if (useTransaction) {
-          action = new SegmentTransactionalInsertAction(
-              segments,
-              createDataSourceMetadata(new SeekableStreamPartitions<>(
-                  finalPartitions.getStream(),
-                  getStartOffsets()
-              )),
-              createDataSourceMetadata(finalPartitions)
-          );
-        } else {
-          action = new SegmentTransactionalInsertAction(segments, null, null);
-        }
-
-        log.info("Publishing with isTransaction[%s].", useTransaction);
-
-        return toolbox.getTaskActionClient().submit(action);
-      };
-    }
-
-  }
-
   private boolean verifyInitialRecordAndSkipExclusivePartition(
       final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record
   )
@@ -1923,7 +1624,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   }
 
   /**
-   * deserailizes the checkpoints into of Map<sequenceId, Map<PartitionIdType, SequenceOffsetType>>
+   * checks if the input seqNum marks end of shard. Used by Kinesis only
+   */
+  protected abstract boolean isEndOfShard(SequenceOffsetType seqNum);
+
+  /**
+   * deserializes the checkpoints into of Map<sequenceId, Map<PartitionIdType, SequenceOffsetType>>
    *
    * @param toolbox           task toolbox
    * @param checkpointsString the json-serialized checkpoint string
@@ -1939,7 +1645,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   ) throws IOException;
 
   /**
-   * Calculates the sequence number used to update `currentOffsets` after finishing reading a record.
+   * Calculates the sequence number used to update `currentOffsets` after finished reading a record.
    * In Kafka this returns sequenceNumeber + 1 since that's the next expected offset
    * In Kinesis this simply returns sequenceNumber, since the sequence numbers in Kinesis are not
    * contiguous and finding the next sequence number requires an expensive API call
@@ -1951,14 +1657,14 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   protected abstract SequenceOffsetType getSequenceNumberToStoreAfterRead(SequenceOffsetType sequenceNumber);
 
   /**
-   * deserialzies stored metadata into SeekableStreamPartitions
+   * deserializes stored metadata into SeekableStreamPartitions
    *
    * @param mapper json objectMapper
    * @param object metadata
    *
    * @return SeekableStreamPartitions
    */
-  protected abstract SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> deserializeSeekableStreamPartitionsFromMetadata(
+  protected abstract SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> deserializePartitionsFromMetadata(
       ObjectMapper mapper,
       Object object
   );
@@ -2028,4 +1734,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
    * partition we read from stream
    */
   protected abstract boolean isStartingSequenceOffsetsExclusive();
+
+  protected abstract TypeReference<List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>> getSequenceMetadataTypeReference();
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
new file mode 100644
index 0000000..7fbc800
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.Committer;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class SequenceMetadata<PartitionIdType, SequenceOffsetType>
+{
+  private final int sequenceId;
+  private final String sequenceName;
+  private final Set<PartitionIdType> exclusiveStartPartitions;
+  private final Set<PartitionIdType> assignments;
+  private final boolean sentinel;
+  private boolean checkpointed;
+  /**
+   * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because
+   * {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  final Map<PartitionIdType, SequenceOffsetType> startOffsets;
+  final Map<PartitionIdType, SequenceOffsetType> endOffsets;
+
+  @JsonCreator
+  public SequenceMetadata(
+      @JsonProperty("sequenceId") int sequenceId,
+      @JsonProperty("sequenceName") String sequenceName,
+      @JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType> startOffsets,
+      @JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType> endOffsets,
+      @JsonProperty("checkpointed") boolean checkpointed,
+      @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> exclusiveStartPartitions
+  )
+  {
+    Preconditions.checkNotNull(sequenceName);
+    Preconditions.checkNotNull(startOffsets);
+    Preconditions.checkNotNull(endOffsets);
+    this.sequenceId = sequenceId;
+    this.sequenceName = sequenceName;
+    this.startOffsets = ImmutableMap.copyOf(startOffsets);
+    this.endOffsets = new HashMap<>(endOffsets);
+    this.assignments = new HashSet<>(startOffsets.keySet());
+    this.checkpointed = checkpointed;
+    this.sentinel = false;
+    this.exclusiveStartPartitions = exclusiveStartPartitions == null
+                                    ? Collections.emptySet()
+                                    : exclusiveStartPartitions;
+  }
+
+  @JsonProperty
+  public Set<PartitionIdType> getExclusiveStartPartitions()
+  {
+    return exclusiveStartPartitions;
+  }
+
+  @JsonProperty
+  public int getSequenceId()
+  {
+    return sequenceId;
+  }
+
+  @JsonProperty
+  public boolean isCheckpointed()
+  {
+    lock.lock();
+    try {
+      return checkpointed;
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  @JsonProperty
+  public String getSequenceName()
+  {
+    return sequenceName;
+  }
+
+  @JsonProperty
+  public Map<PartitionIdType, SequenceOffsetType> getStartOffsets()
+  {
+    return startOffsets;
+  }
+
+  @JsonProperty
+  public Map<PartitionIdType, SequenceOffsetType> getEndOffsets()
+  {
+    lock.lock();
+    try {
+      return endOffsets;
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  @JsonProperty
+  public boolean isSentinel()
+  {
+    return sentinel;
+  }
+
+  void setEndOffsets(Map<PartitionIdType, SequenceOffsetType> newEndOffsets)
+  {
+    lock.lock();
+    try {
+      endOffsets.putAll(newEndOffsets);
+      checkpointed = true;
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  void updateAssignments(
+      SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner,
+      Map<PartitionIdType, SequenceOffsetType> nextPartitionOffset
+  )
+  {
+    lock.lock();
+    try {
+      assignments.clear();
+      nextPartitionOffset.forEach((key, value) -> {
+        SequenceOffsetType endOffset = endOffsets.get(key);
+        if (SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER.equals(endOffset)
+            || runner.createSequenceNumber(endOffset).compareTo(runner.createSequenceNumber(nextPartitionOffset.get(key))) > 0) {
+          assignments.add(key);
+        }
+      });
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  boolean isOpen()
+  {
+    return !assignments.isEmpty();
+  }
+
+  boolean canHandle(
+      SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner,
+      OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record
+  )
+  {
+    lock.lock();
+    try {
+      final OrderedSequenceNumber<SequenceOffsetType> partitionEndOffset = runner.createSequenceNumber(endOffsets.get(record.getPartitionId()));
+      final OrderedSequenceNumber<SequenceOffsetType> partitionStartOffset = runner.createSequenceNumber(startOffsets.get(
+          record.getPartitionId()));
+      final OrderedSequenceNumber<SequenceOffsetType> recordOffset = runner.createSequenceNumber(record.getSequenceNumber());
+      if (!isOpen() || recordOffset == null || partitionEndOffset == null || partitionStartOffset == null) {
+        return false;
+      }
+      boolean ret;
+      if (runner.isStartingSequenceOffsetsExclusive()) {
+        ret = recordOffset.compareTo(partitionStartOffset)
+              >= (getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0);
+      } else {
+        ret = recordOffset.compareTo(partitionStartOffset) >= 0;
+      }
+
+      if (runner.isEndSequenceOffsetsExclusive()) {
+        ret &= recordOffset.compareTo(partitionEndOffset) < 0;
+      } else {
+        ret &= recordOffset.compareTo(partitionEndOffset) <= 0;
+      }
+
+      return ret;
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public String toString()
+  {
+    lock.lock();
+    try {
+      return "SequenceMetadata{" +
+             "sequenceName='" + sequenceName + '\'' +
+             ", sequenceId=" + sequenceId +
+             ", startOffsets=" + startOffsets +
+             ", endOffsets=" + endOffsets +
+             ", assignments=" + assignments +
+             ", sentinel=" + sentinel +
+             ", checkpointed=" + checkpointed +
+             '}';
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  Supplier<Committer> getCommitterSupplier(
+      SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner,
+      String stream,
+      Map<PartitionIdType, SequenceOffsetType> lastPersistedOffsets
+  )
+  {
+    // Set up committer.
+    return () ->
+        new Committer()
+        {
+          @Override
+          public Object getMetadata()
+          {
+            lock.lock();
+
+            try {
+              Preconditions.checkState(
+                  assignments.isEmpty(),
+                  "This committer can be used only once all the records till sequences [%s] have been consumed, also make"
+                  + " sure to call updateAssignments before using this committer",
+                  endOffsets
+              );
+
+
+              // merge endOffsets for this sequence with globally lastPersistedOffsets
+              // This is done because this committer would be persisting only sub set of segments
+              // corresponding to the current sequence. Generally, lastPersistedOffsets should already
+              // cover endOffsets but just to be sure take max of sequences and persist that
+              for (Map.Entry<PartitionIdType, SequenceOffsetType> partitionOffset : endOffsets.entrySet()) {
+                SequenceOffsetType newOffsets = partitionOffset.getValue();
+                if (lastPersistedOffsets.containsKey(partitionOffset.getKey())
+                    && runner.createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey()))
+                             .compareTo(runner.createSequenceNumber(newOffsets)) > 0) {
+                  newOffsets = lastPersistedOffsets.get(partitionOffset.getKey());
+                }
+                lastPersistedOffsets.put(
+                    partitionOffset.getKey(),
+                    newOffsets
+                );
+              }
+
+              // Publish metadata can be different from persist metadata as we are going to publish only
+              // subset of segments
+              return ImmutableMap.of(
+                  SeekableStreamIndexTaskRunner.METADATA_NEXT_PARTITIONS,
+                  new SeekableStreamPartitions<>(stream, lastPersistedOffsets),
+                  SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS,
+                  new SeekableStreamPartitions<>(stream, endOffsets)
+              );
+            }
+            finally {
+              lock.unlock();
+            }
+          }
+
+          @Override
+          public void run()
+          {
+            // Do nothing.
+          }
+        };
+
+  }
+
+  TransactionalSegmentPublisher createPublisher(
+      SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner,
+      TaskToolbox toolbox,
+      boolean useTransaction
+  )
+  {
+    return (segments, commitMetadata) -> {
+      final Map commitMetaMap = (Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata");
+      final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> finalPartitions =
+          runner.deserializePartitionsFromMetadata(
+              toolbox.getObjectMapper(),
+              commitMetaMap.get(SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS)
+          );
+
+      // Sanity check, we should only be publishing things that match our desired end state.
+      if (!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) {
+        throw new ISE(
+            "WTF?! Driver for sequence [%s], attempted to publish invalid metadata[%s].",
+            toString(),
+            commitMetadata
+        );
+      }
+
+      final SegmentTransactionalInsertAction action;
+
+      if (useTransaction) {
+        action = new SegmentTransactionalInsertAction(
+            segments,
+            runner.createDataSourceMetadata(
+                new SeekableStreamPartitions<>(finalPartitions.getStream(), getStartOffsets())
+            ),
+            runner.createDataSourceMetadata(finalPartitions)
+        );
+      } else {
+        action = new SegmentTransactionalInsertAction(segments, null, null);
+      }
+
+      return toolbox.getTaskActionClient().submit(action);
+    };
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 25250ac..b93d15f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -119,8 +119,7 @@ import java.util.stream.Stream;
  * @param <PartitionIdType>    the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type
  * @param <SequenceOffsetType> the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers
  */
-public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>
-    implements Supervisor
+public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType> implements Supervisor
 {
   public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org