You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/04/24 22:14:06 UTC

[incubator-druid] 05/20: Support Kafka supervisor adopting running tasks between versions (#7212)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.14.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git

commit 6756b6fbced9029c0c5d3301b13dce03e6f75047
Author: Justin Borromeo <jb...@edu.uwaterloo.ca>
AuthorDate: Wed Apr 10 18:16:38 2019 -0700

    Support Kafka supervisor adopting running tasks between versions  (#7212)
    
    * Recompute hash in isTaskCurrent() and added tests
    
    * Fixed checkstyle stuff
    
    * Fixed failing tests
    
    * Make TestableKafkaSupervisorWithCustomIsTaskCurrent static
    
    * Add doc
    
    * baseSequenceName change
    
    * Added comment
    
    * WIP
    
    * Fixed imports
    
    * Undid lambda change for diff sake
    
    * Cleanup
    
    * Added comment
    
    * Reinsert Kafka tests
    
    * Readded kinesis test
    
    * Readd bad partition assignment in kinesis supervisor test
    
    * Nit
    
    * Misnamed var
---
 .../kafka/KafkaIndexTaskTuningConfigTest.java      |   96 ++
 .../kafka/supervisor/KafkaSupervisorTest.java      |  709 +++++++++++--
 .../TestModifiedKafkaIndexTaskTuningConfig.java    |   88 ++
 .../kinesis/KinesisIndexTaskTuningConfigTest.java  |  117 +++
 .../kinesis/supervisor/KinesisSupervisorTest.java  | 1108 +++++++++++++++-----
 .../TestModifiedKinesisIndexTaskTuningConfig.java  |  132 +++
 .../supervisor/SeekableStreamSupervisor.java       |  165 ++-
 .../druid/segment/indexing/DataSchemaTest.java     |   79 ++
 .../segment/indexing/TestModifiedDataSchema.java   |   56 +
 9 files changed, 2126 insertions(+), 424 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 14ecd4e..63b0e98 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
+import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.indexing.TuningConfig;
@@ -30,6 +31,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 
 public class KafkaIndexTaskTuningConfigTest
 {
@@ -145,6 +147,100 @@ public class KafkaIndexTaskTuningConfigTest
     Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
   }
 
+  @Test
+  public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
+  {
+    KafkaIndexTaskTuningConfig base = new KafkaIndexTaskTuningConfig(
+        1,
+        null,
+        2,
+        10L,
+        new Period("PT3S"),
+        new File("/tmp/xxx"),
+        4,
+        new IndexSpec(),
+        true,
+        true,
+        5L,
+        null,
+        null,
+        null,
+        true,
+        42,
+        42
+    );
+
+    String serialized = mapper.writeValueAsString(base);
+    TestModifiedKafkaIndexTaskTuningConfig deserialized =
+        mapper.readValue(serialized, TestModifiedKafkaIndexTaskTuningConfig.class);
+
+    Assert.assertEquals(null, deserialized.getExtra());
+    Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
+    Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
+    Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
+    Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
+    Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
+    Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
+    Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
+    Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
+    Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
+    Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
+    Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
+    Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
+    Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
+    Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
+    Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
+    Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
+    Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
+  }
+
+  @Test
+  public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
+  {
+    TestModifiedKafkaIndexTaskTuningConfig base = new TestModifiedKafkaIndexTaskTuningConfig(
+        1,
+        null,
+        2,
+        10L,
+        new Period("PT3S"),
+        new File("/tmp/xxx"),
+        4,
+        new IndexSpec(),
+        true,
+        true,
+        5L,
+        null,
+        null,
+        null,
+        true,
+        42,
+        42,
+        "extra string"
+    );
+
+    String serialized = mapper.writeValueAsString(base);
+    KafkaIndexTaskTuningConfig deserialized =
+        mapper.readValue(serialized, KafkaIndexTaskTuningConfig.class);
+
+    Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
+    Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
+    Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
+    Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
+    Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
+    Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
+    Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
+    Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
+    Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
+    Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
+    Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
+    Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
+    Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
+    Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
+    Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
+    Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
+    Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
+  }
+
   private static KafkaIndexTaskTuningConfig copy(KafkaIndexTaskTuningConfig config)
   {
     return new KafkaIndexTaskTuningConfig(
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index bb8c6a2..62280e0 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -49,6 +49,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTask;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskClient;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
 import org.apache.druid.indexing.kafka.test.TestBroker;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -61,6 +62,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
 import org.apache.druid.java.util.common.DateTimes;
@@ -139,7 +141,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
   private static int topicPostfix;
   private static ZkUtils zkUtils;
 
-
   private final int numThreads;
 
   private KafkaSupervisor supervisor;
@@ -259,7 +260,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testNoInitialState() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -315,7 +316,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testSkipOffsetGaps() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -338,13 +339,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
     KafkaIndexTask task = captured.getValue();
     KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
 
-    Assert.assertTrue("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
   }
 
   @Test
   public void testMultiTask() throws Exception
   {
-    supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -399,7 +399,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testReplicas() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -454,7 +454,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testLateMessageRejectionPeriod() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null, false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null, false);
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -493,7 +493,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testEarlyMessageRejectionPeriod() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"), false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"), false);
     addSomeEvents(1);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -535,7 +535,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
    */
   public void testLatestOffset() throws Exception
   {
-    supervisor = getSupervisor(1, 1, false, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null, false);
     addSomeEvents(1100);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -576,7 +576,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
    */
   public void testDatasourceMetadata() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     addSomeEvents(100);
 
     Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -615,7 +615,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test(expected = ISE.class)
   public void testBadMetadataOffsets() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
@@ -632,47 +632,14 @@ public class KafkaSupervisorTest extends EasyMockSupport
   }
 
   @Test
-  public void testKillIncompatibleTasks() throws Exception
+  public void testDontKillTasksWithMismatchedDatasourceAndType() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
-    // unexpected # of partitions (kill)
+    // different datasource (don't kill)
     Task id1 = createKafkaIndexTask(
         "id1",
-        DATASOURCE,
-        1,
-        new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L), ImmutableSet.of()),
-        new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 10L)),
-        null,
-        null
-    );
-
-    // correct number of partitions and ranges (don't kill)
-    Task id2 = createKafkaIndexTask(
-        "id2",
-        DATASOURCE,
-        0,
-        new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
-        new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L)),
-        null,
-        null
-    );
-
-    // unexpected range on partition 2 (kill)
-    Task id3 = createKafkaIndexTask(
-        "id3",
-        DATASOURCE,
-        1,
-        new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L), ImmutableSet.of()),
-        new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L)),
-        null,
-        null
-    );
-
-    // different datasource (don't kill)
-    Task id4 = createKafkaIndexTask(
-        "id4",
         "other-datasource",
         2,
         new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L), ImmutableSet.of()),
@@ -682,8 +649,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     );
 
     // non KafkaIndexTask (don't kill)
-    Task id5 = new RealtimeIndexTask(
-        "id5",
+    Task id2 = new RealtimeIndexTask(
+        "id2",
         null,
         new FireDepartment(
             dataSchema,
@@ -693,18 +660,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null
     );
 
-    List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4, id5);
+    List<Task> existingTasks = ImmutableList.of(id1, id2);
 
     expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
     expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
-    expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
-    expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
-    expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
-    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
-    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
-    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
     expect(taskClient.getStatusAsync(EasyMock.anyString()))
         .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
         .anyTimes();
@@ -716,18 +677,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
             null
         )
     ).anyTimes();
-    expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
-    expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(false));
+
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
-    taskQueue.shutdown("id3", "Task [%s] failed to stop in a timely manner, killing task", "id3");
 
-    expect(taskQueue.add(anyObject(Task.class))).andReturn(true);
-
-    TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
-    checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
-        .andReturn(Futures.immediateFuture(checkpoints))
-        .times(2);
+    expect(taskQueue.add(anyObject(Task.class))).andReturn(true).anyTimes();
 
     replayAll();
 
@@ -739,7 +692,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testKillBadPartitionAssignment() throws Exception
   {
-    supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Task id1 = createKafkaIndexTask(
@@ -843,10 +796,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
     verifyAll();
   }
 
+
   @Test
   public void testRequeueTaskWhenFailed() throws Exception
   {
-    supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -935,7 +889,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testRequeueAdoptedTaskWhenFailed() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     DateTime now = DateTimes.nowUtc();
@@ -1036,7 +990,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testQueueNextTasksOnSuccess() throws Exception
   {
-    supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1138,7 +1092,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null, false);
     addSomeEvents(100);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1233,7 +1187,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Task task = createKafkaIndexTask(
@@ -1348,7 +1302,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Task task = createKafkaIndexTask(
@@ -1456,7 +1410,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     addSomeEvents(6);
 
     Task id1 = createKafkaIndexTask(
@@ -1563,7 +1517,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
   {
-    supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1622,7 +1576,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null, false);
     addSomeEvents(100);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1708,7 +1662,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null, false);
     addSomeEvents(100);
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1799,7 +1753,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test(expected = IllegalStateException.class)
   public void testStopNotStarted()
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     supervisor.stop(false);
   }
 
@@ -1811,7 +1765,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     taskRunner.unregisterListener(StringUtils.format("KafkaSupervisor-%s", DATASOURCE));
     replayAll();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     supervisor.start();
     supervisor.stop(false);
 
@@ -1825,7 +1779,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Task id1 = createKafkaIndexTask(
@@ -1937,7 +1891,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
     replayAll();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     supervisor.start();
     supervisor.runInternal();
     verifyAll();
@@ -1954,7 +1908,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testResetDataSourceMetadata() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@@ -1970,7 +1924,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Capture<DataSourceMetadata> captureDataSourceMetadata = EasyMock.newCapture();
 
     KafkaDataSourceMetadata kafkaDataSourceMetadata = new KafkaDataSourceMetadata(
-        new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 1000L, 1, 1000L, 2, 1000L), ImmutableSet.of())
+        new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            ImmutableMap.of(0, 1000L, 1, 1000L, 2, 1000L),
+            ImmutableSet.of()
+        )
     );
 
     KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(
@@ -2006,7 +1964,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testResetNoDataSourceMetadata() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
     expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@@ -2042,7 +2000,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false);
     addSomeEvents(1);
 
     Task id1 = createKafkaIndexTask(
@@ -2140,7 +2098,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   public void testNoDataIngestionTasks() throws Exception
   {
     final DateTime startTime = DateTimes.nowUtc();
-    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
     //not adding any events
     Task id1 = createKafkaIndexTask(
         "id1",
@@ -2236,7 +2194,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   public void testCheckpointForInactiveTaskGroup()
       throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
   {
-    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
     //not adding any events
     final Task id1 = createKafkaIndexTask(
         "id1",
@@ -2329,8 +2287,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.checkpoint(
         0,
         ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
-        new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())),
-        new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, fakeCheckpoints, fakeCheckpoints.keySet()))
+        new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            checkpoints.get(0),
+            ImmutableSet.of()
+        )),
+        new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            fakeCheckpoints,
+            fakeCheckpoints.keySet()
+        ))
     );
 
     while (supervisor.getNoticesQueueSize() > 0) {
@@ -2348,7 +2314,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   public void testCheckpointForUnknownTaskGroup()
       throws InterruptedException
   {
-    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
     //not adding any events
     final Task id1 = createKafkaIndexTask(
         "id1",
@@ -2409,8 +2375,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.checkpoint(
         0,
         ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
-        new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of())),
-        new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of()))
+        new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            Collections.emptyMap(),
+            ImmutableSet.of()
+        )),
+        new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            Collections.emptyMap(),
+            ImmutableSet.of()
+        ))
     );
 
     while (supervisor.getNoticesQueueSize() > 0) {
@@ -2437,7 +2411,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   public void testCheckpointWithNullTaskGroupId()
       throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
   {
-    supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
+    supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null, false);
     //not adding any events
     final Task id1 = createKafkaIndexTask(
         "id1",
@@ -2515,7 +2489,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.checkpoint(
         null,
         ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
-        new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())),
+        new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            checkpoints.get(0),
+            ImmutableSet.of()
+        )),
         new KafkaDataSourceMetadata(
             new SeekableStreamStartSequenceNumbers<>(topic, newCheckpoints.get(0), newCheckpoints.get(0).keySet())
         )
@@ -2531,7 +2509,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testSuspendedNoRunningTasks() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
     addSomeEvents(1);
 
     expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -2564,7 +2542,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false, true, kafkaHost);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false, true, kafkaHost);
     addSomeEvents(1);
 
     Task id1 = createKafkaIndexTask(
@@ -2672,7 +2650,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
     replayAll();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
     supervisor.start();
     supervisor.runInternal();
     verifyAll();
@@ -2689,7 +2667,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   public void testFailedInitializationAndRecovery() throws Exception
   {
     // Block the supervisor initialization with a bad hostname config, make sure this doesn't block the lifecycle
-    supervisor = getSupervisor(
+    supervisor = getTestableSupervisor(
         1,
         1,
         true,
@@ -2797,7 +2775,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void testGetCurrentTotalStats()
   {
-    supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false, false, kafkaHost);
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition(0),
         ImmutableMap.of(0, 0L),
@@ -2838,6 +2816,286 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", "val2")), stats.get("1"));
   }
 
+  @Test
+  public void testDoNotKillCompatibleTasks()
+      throws Exception
+  {
+    // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks
+    int numReplicas = 2;
+    supervisor = getTestableSupervisorCustomIsTaskCurrent(
+        numReplicas,
+        1,
+        true,
+        "PT1H",
+        new Period("P1D"),
+        new Period("P1D"),
+        false,
+        true
+    );
+
+    addSomeEvents(1);
+
+    Task task = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
+        null,
+        null
+    );
+
+    List<Task> existingTasks = ImmutableList.of(task);
+
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
+    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
+    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+            .anyTimes();
+    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+            .anyTimes();
+    EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+    EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true);
+
+    TreeMap<Integer, Map<Integer, Long>> checkpoints1 = new TreeMap<>();
+    checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
+
+    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints1))
+            .times(numReplicas);
+
+    replayAll();
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+  }
+
+  @Test
+  public void testKillIncompatibleTasks()
+      throws Exception
+  {
+    // This supervisor always returns false for isTaskCurrent -> it should kill its tasks
+    int numReplicas = 2;
+    supervisor = getTestableSupervisorCustomIsTaskCurrent(
+        numReplicas,
+        1,
+        true,
+        "PT1H",
+        new Period("P1D"),
+        new Period("P1D"),
+        false,
+        false
+    );
+
+    addSomeEvents(1);
+
+    Task task = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    List<Task> existingTasks = ImmutableList.of(task);
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
+    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
+    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+            .anyTimes();
+    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+            .anyTimes();
+    EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+    EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+    EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2);
+
+    replayAll();
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+  }
+
+  @Test
+  public void testIsTaskCurrent()
+  {
+    DateTime minMessageTime = DateTimes.nowUtc();
+    DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
+
+    KafkaSupervisor supervisor = getSupervisor(
+        2,
+        1,
+        true,
+        "PT1H",
+        new Period("P1D"),
+        new Period("P1D"),
+        false,
+        kafkaHost,
+        dataSchema,
+        tuningConfig
+    );
+
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        42,
+        ImmutableMap.of(0, 0L, 2, 0L),
+        Optional.of(minMessageTime),
+        Optional.of(maxMessageTime),
+        ImmutableSet.of("id1", "id2", "id3", "id4"),
+        ImmutableSet.of()
+    );
+
+    DataSchema modifiedDataSchema = getDataSchema("some other datasource");
+
+    KafkaSupervisorTuningConfig modifiedTuningConfig = new KafkaSupervisorTuningConfig(
+        42, // This is different
+        null,
+        50000,
+        null,
+        new Period("P1Y"),
+        new File("/test"),
+        null,
+        null,
+        true,
+        false,
+        null,
+        null,
+        null,
+        numThreads,
+        TEST_CHAT_THREADS,
+        TEST_CHAT_RETRIES,
+        TEST_HTTP_TIMEOUT,
+        TEST_SHUTDOWN_TIMEOUT,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+
+    KafkaIndexTask taskFromStorage = createKafkaIndexTask(
+        "id1",
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
+        minMessageTime,
+        maxMessageTime,
+        dataSchema
+    );
+
+    KafkaIndexTask taskFromStorageMismatchedDataSchema = createKafkaIndexTask(
+        "id2",
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
+        minMessageTime,
+        maxMessageTime,
+        modifiedDataSchema
+    );
+
+    KafkaIndexTask taskFromStorageMismatchedTuningConfig = createKafkaIndexTask(
+        "id3",
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
+        minMessageTime,
+        maxMessageTime,
+        dataSchema,
+        modifiedTuningConfig
+    );
+
+    KafkaIndexTask taskFromStorageMismatchedPartitionsWithTaskGroup = createKafkaIndexTask(
+        "id4",
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, 0L, 2, 6L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
+        minMessageTime,
+        maxMessageTime,
+        dataSchema
+    );
+
+    EasyMock.expect(taskStorage.getTask("id1"))
+            .andReturn(Optional.of(taskFromStorage))
+            .once();
+    EasyMock.expect(taskStorage.getTask("id2"))
+            .andReturn(Optional.of(taskFromStorageMismatchedDataSchema))
+            .once();
+    EasyMock.expect(taskStorage.getTask("id3"))
+            .andReturn(Optional.of(taskFromStorageMismatchedTuningConfig))
+            .once();
+    EasyMock.expect(taskStorage.getTask("id4"))
+            .andReturn(Optional.of(taskFromStorageMismatchedPartitionsWithTaskGroup))
+            .once();
+
+    replayAll();
+
+    Assert.assertTrue(supervisor.isTaskCurrent(42, "id1"));
+    Assert.assertFalse(supervisor.isTaskCurrent(42, "id2"));
+    Assert.assertFalse(supervisor.isTaskCurrent(42, "id3"));
+    Assert.assertFalse(supervisor.isTaskCurrent(42, "id4"));
+    verifyAll();
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     //create topic manually
@@ -2859,7 +3117,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     }
   }
 
-  private KafkaSupervisor getSupervisor(
+  private KafkaSupervisor getTestableSupervisor(
       int replicas,
       int taskCount,
       boolean useEarliestOffset,
@@ -2869,7 +3127,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
       boolean skipOffsetGaps
   )
   {
-    return getSupervisor(
+    return getTestableSupervisor(
         replicas,
         taskCount,
         useEarliestOffset,
@@ -2882,7 +3140,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     );
   }
 
-  private KafkaSupervisor getSupervisor(
+  private KafkaSupervisor getTestableSupervisor(
       int replicas,
       int taskCount,
       boolean useEarliestOffset,
@@ -2959,6 +3217,170 @@ public class KafkaSupervisorTest extends EasyMockSupport
     );
   }
 
+  /**
+   * Use when you want to mock the return value of SeekableStreamSupervisor#isTaskCurrent()
+   */
+  private KafkaSupervisor getTestableSupervisorCustomIsTaskCurrent(
+      int replicas,
+      int taskCount,
+      boolean useEarliestOffset,
+      String duration,
+      Period lateMessageRejectionPeriod,
+      Period earlyMessageRejectionPeriod,
+      boolean suspended,
+      boolean isTaskCurrentReturn
+  )
+  {
+    Map<String, Object> consumerProperties = new HashMap<>();
+    consumerProperties.put("myCustomKey", "myCustomValue");
+    consumerProperties.put("bootstrap.servers", kafkaHost);
+    consumerProperties.put("isolation.level", "read_committed");
+    KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
+        topic,
+        replicas,
+        taskCount,
+        new Period(duration),
+        consumerProperties,
+        KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+        new Period("P1D"),
+        new Period("PT30S"),
+        useEarliestOffset,
+        new Period("PT30M"),
+        lateMessageRejectionPeriod,
+        earlyMessageRejectionPeriod,
+        false
+    );
+
+    KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
+        null,
+        null
+    )
+    {
+      @Override
+      public KafkaIndexTaskClient build(
+          TaskInfoProvider taskInfoProvider,
+          String dataSource,
+          int numThreads,
+          Duration httpTimeout,
+          long numRetries
+      )
+      {
+        Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+        Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+        return taskClient;
+      }
+    };
+
+    return new TestableKafkaSupervisorWithCustomIsTaskCurrent(
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        objectMapper,
+        new KafkaSupervisorSpec(
+            dataSchema,
+            tuningConfig,
+            kafkaSupervisorIOConfig,
+            null,
+            suspended,
+            taskStorage,
+            taskMaster,
+            indexerMetadataStorageCoordinator,
+            taskClientFactory,
+            objectMapper,
+            new NoopServiceEmitter(),
+            new DruidMonitorSchedulerConfig(),
+            rowIngestionMetersFactory
+        ),
+        rowIngestionMetersFactory,
+        isTaskCurrentReturn
+    );
+  }
+
+  /**
+   * Use when you don't want generateSequenceNumber overridden
+   */
+
+  private KafkaSupervisor getSupervisor(
+      int replicas,
+      int taskCount,
+      boolean useEarliestOffset,
+      String duration,
+      Period lateMessageRejectionPeriod,
+      Period earlyMessageRejectionPeriod,
+      boolean suspended,
+      String kafkaHost,
+      DataSchema dataSchema,
+      KafkaSupervisorTuningConfig tuningConfig
+  )
+  {
+    Map<String, Object> consumerProperties = new HashMap<>();
+    consumerProperties.put("myCustomKey", "myCustomValue");
+    consumerProperties.put("bootstrap.servers", kafkaHost);
+    consumerProperties.put("isolation.level", "read_committed");
+    KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
+        topic,
+        replicas,
+        taskCount,
+        new Period(duration),
+        consumerProperties,
+        KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+        new Period("P1D"),
+        new Period("PT30S"),
+        useEarliestOffset,
+        new Period("PT30M"),
+        lateMessageRejectionPeriod,
+        earlyMessageRejectionPeriod,
+        false
+    );
+
+    KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
+        null,
+        null
+    )
+    {
+      @Override
+      public KafkaIndexTaskClient build(
+          TaskInfoProvider taskInfoProvider,
+          String dataSource,
+          int numThreads,
+          Duration httpTimeout,
+          long numRetries
+      )
+      {
+        Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+        Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+        return taskClient;
+      }
+    };
+
+    return new KafkaSupervisor(
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        objectMapper,
+        new KafkaSupervisorSpec(
+            dataSchema,
+            tuningConfig,
+            kafkaSupervisorIOConfig,
+            null,
+            suspended,
+            taskStorage,
+            taskMaster,
+            indexerMetadataStorageCoordinator,
+            taskClientFactory,
+            objectMapper,
+            new NoopServiceEmitter(),
+            new DruidMonitorSchedulerConfig(),
+            rowIngestionMetersFactory
+        ),
+        rowIngestionMetersFactory
+    );
+  }
+
   private static DataSchema getDataSchema(String dataSource)
   {
     List<DimensionSchema> dimensions = new ArrayList<>();
@@ -3004,10 +3426,54 @@ public class KafkaSupervisorTest extends EasyMockSupport
       DateTime maximumMessageTime
   )
   {
+    return createKafkaIndexTask(
+        id,
+        taskGroupId,
+        startPartitions,
+        endPartitions,
+        minimumMessageTime,
+        maximumMessageTime,
+        getDataSchema(dataSource)
+    );
+  }
+
+  private KafkaIndexTask createKafkaIndexTask(
+      String id,
+      int taskGroupId,
+      SeekableStreamStartSequenceNumbers<Integer, Long> startPartitions,
+      SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
+      DateTime minimumMessageTime,
+      DateTime maximumMessageTime,
+      DataSchema schema
+  )
+  {
+    return createKafkaIndexTask(
+        id,
+        taskGroupId,
+        startPartitions,
+        endPartitions,
+        minimumMessageTime,
+        maximumMessageTime,
+        schema,
+        (KafkaIndexTaskTuningConfig) tuningConfig.convertToTaskTuningConfig()
+    );
+  }
+
+  private KafkaIndexTask createKafkaIndexTask(
+      String id,
+      int taskGroupId,
+      SeekableStreamStartSequenceNumbers<Integer, Long> startPartitions,
+      SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
+      DateTime minimumMessageTime,
+      DateTime maximumMessageTime,
+      DataSchema schema,
+      KafkaIndexTaskTuningConfig tuningConfig
+  )
+  {
     return new KafkaIndexTask(
         id,
         null,
-        getDataSchema(dataSource),
+        schema,
         tuningConfig,
         new KafkaIndexTaskIOConfig(
             taskGroupId,
@@ -3060,7 +3526,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
     {
       return dataSource;
     }
-
   }
 
   private static class TestableKafkaSupervisor extends KafkaSupervisor
@@ -3090,7 +3555,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
     protected String generateSequenceName(
         Map<Integer, Long> startPartitions,
         Optional<DateTime> minimumMessageTime,
-        Optional<DateTime> maximumMessageTime
+        Optional<DateTime> maximumMessageTime,
+        DataSchema dataSchema,
+        SeekableStreamIndexTaskTuningConfig tuningConfig
     )
     {
       final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next());
@@ -3098,5 +3565,37 @@ public class KafkaSupervisorTest extends EasyMockSupport
     }
   }
 
+  private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor
+  {
+    private boolean isTaskCurrentReturn;
+
+    public TestableKafkaSupervisorWithCustomIsTaskCurrent(
+        TaskStorage taskStorage,
+        TaskMaster taskMaster,
+        IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
+        KafkaIndexTaskClientFactory taskClientFactory,
+        ObjectMapper mapper,
+        KafkaSupervisorSpec spec,
+        RowIngestionMetersFactory rowIngestionMetersFactory,
+        boolean isTaskCurrentReturn
+    )
+    {
+      super(
+          taskStorage,
+          taskMaster,
+          indexerMetadataStorageCoordinator,
+          taskClientFactory,
+          mapper,
+          spec,
+          rowIngestionMetersFactory
+      );
+      this.isTaskCurrentReturn = isTaskCurrentReturn;
+    }
 
+    @Override
+    public boolean isTaskCurrent(int taskGroupId, String taskId)
+    {
+      return isTaskCurrentReturn;
+    }
+  }
 }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
new file mode 100644
index 0000000..3cc124f
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.test;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+@JsonTypeName("KafkaTuningConfig")
+public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuningConfig
+{
+  private final String extra;
+
+  @JsonCreator
+  public TestModifiedKafkaIndexTaskTuningConfig(
+      @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
+      @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+      @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+      @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
+      @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
+      @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
+      @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
+      // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
+      @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
+      @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
+      @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
+      @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
+      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
+      @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+      @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+      @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
+      @JsonProperty("extra") String extra
+  )
+  {
+    super(
+        maxRowsInMemory,
+        maxBytesInMemory,
+        maxRowsPerSegment,
+        maxTotalRows,
+        intermediatePersistPeriod,
+        basePersistDirectory,
+        maxPendingPersists,
+        indexSpec,
+        true,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        resetOffsetAutomatically,
+        segmentWriteOutMediumFactory,
+        intermediateHandoffPeriod,
+        logParseExceptions,
+        maxParseExceptions,
+        maxSavedParseExceptions
+    );
+    this.extra = extra;
+  }
+
+  @JsonProperty("extra")
+  public String getExtra()
+  {
+    return extra;
+  }
+}
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 2983324..57ffebc 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
+import org.apache.druid.indexing.kinesis.test.TestModifiedKinesisIndexTaskTuningConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.indexing.TuningConfig;
@@ -34,6 +35,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.File;
+import java.io.IOException;
 
 public class KinesisIndexTaskTuningConfigTest
 {
@@ -131,6 +133,121 @@ public class KinesisIndexTaskTuningConfigTest
   }
 
   @Test
+  public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
+  {
+    KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
+        1,
+        3L,
+        2,
+        100L,
+        new Period("PT3S"),
+        new File("/tmp/xxx"),
+        4,
+        new IndexSpec(),
+        true,
+        true,
+        5L,
+        true,
+        false,
+        1000,
+        1000,
+        500,
+        null,
+        42,
+        null,
+        false,
+        500,
+        500,
+        6000,
+        new Period("P3D")
+    );
+
+    String serialized = mapper.writeValueAsString(base);
+    TestModifiedKinesisIndexTaskTuningConfig deserialized =
+        mapper.readValue(serialized, TestModifiedKinesisIndexTaskTuningConfig.class);
+
+    Assert.assertEquals(null, deserialized.getExtra());
+    Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
+    Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
+    Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
+    Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
+    Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
+    Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
+    Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
+    Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
+    Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
+    Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
+    Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
+    Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
+    Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
+    Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
+    Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
+    Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
+    Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
+    Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait());
+    Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout());
+    Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize());
+    Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll());
+  }
+
+  @Test
+  public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
+  {
+    KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
+        1,
+        3L,
+        2,
+        100L,
+        new Period("PT3S"),
+        new File("/tmp/xxx"),
+        4,
+        new IndexSpec(),
+        true,
+        true,
+        5L,
+        true,
+        false,
+        1000,
+        1000,
+        500,
+        null,
+        42,
+        null,
+        false,
+        500,
+        500,
+        6000,
+        new Period("P3D")
+    );
+
+    String serialized = mapper.writeValueAsString(new TestModifiedKinesisIndexTaskTuningConfig(base, "loool"));
+    KinesisIndexTaskTuningConfig deserialized =
+        mapper.readValue(serialized, KinesisIndexTaskTuningConfig.class);
+
+    Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
+    Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
+    Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
+    Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
+    Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
+    Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
+    Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
+    Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
+    Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
+    Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
+    Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
+    Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
+    Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
+    Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
+    Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
+    Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
+    Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
+    Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait());
+    Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout());
+    Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize());
+    Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll());
+  }
+
+  @Test
   public void testResetOffsetAndSkipSequenceNotBothTrue() throws Exception
   {
     String jsonStr = "{\n"
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index b42e356..cc5ae9e 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.kinesis.supervisor;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -46,6 +45,7 @@ import org.apache.druid.indexing.kinesis.KinesisIndexTask;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskClient;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
+import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
 import org.apache.druid.indexing.kinesis.KinesisRecordSupplier;
 import org.apache.druid.indexing.kinesis.KinesisSequenceNumber;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -58,6 +58,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
@@ -68,6 +69,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
@@ -112,8 +114,6 @@ import static org.easymock.EasyMock.expectLastCall;
 
 public class KinesisSupervisorTest extends EasyMockSupport
 {
-
-
   private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
   private static final String DATASOURCE = "testDS";
   private static final int TEST_CHAT_THREADS = 3;
@@ -209,7 +209,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testNoInitialState() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -274,7 +274,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testMultiTask() throws Exception
   {
-    supervisor = getSupervisor(1, 2, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -331,7 +331,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testReplicas() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -405,7 +405,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testLateMessageRejectionPeriod() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null);
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -454,7 +454,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testEarlyMessageRejectionPeriod() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"));
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"));
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -508,7 +508,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
    */
   public void testDatasourceMetadata() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -529,7 +529,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
     EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
     EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
         new KinesisDataSourceMetadata(
-            new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2", shardId0, "1"), ImmutableSet.of())
+            new SeekableStreamStartSequenceNumbers<>(
+                stream,
+                ImmutableMap.of(shardId1, "2", shardId0, "1"),
+                ImmutableSet.of()
+            )
         )
     ).anyTimes();
 
@@ -556,7 +560,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test(expected = ISE.class)
   public void testBadMetadataOffsets() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -575,7 +579,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
     EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
     EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
         new KinesisDataSourceMetadata(
-            new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "101", shardId0, "-1"), ImmutableSet.of())
+            new SeekableStreamStartSequenceNumbers<>(
+                stream,
+                ImmutableMap.of(shardId1, "101", shardId0, "-1"),
+                ImmutableSet.of()
+            )
         )
     ).anyTimes();
     replayAll();
@@ -585,9 +593,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
   }
 
   @Test
-  public void testKillIncompatibleTasks() throws Exception
+  public void testDontKillTasksWithMismatchedDatasourceAndType() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -601,48 +609,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
     supervisorRecordSupplier.seek(anyObject(), anyString());
     expectLastCall().anyTimes();
 
-    // unexpected # of partitions (kill)
+    // different datasource (don't kill)
     Task id1 = createKinesisIndexTask(
         "id1",
-        DATASOURCE,
-        1,
-        new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()),
-        new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
-        null,
-        null
-    );
-
-    // correct number of partitions and ranges (don't kill)
-    Task id2 = createKinesisIndexTask(
-        "id2",
-        DATASOURCE,
-        0,
-        new SeekableStreamStartSequenceNumbers<>(
-            stream,
-            ImmutableMap.of(shardId0, "0", shardId1, "0"), ImmutableSet.of(shardId0, shardId1)
-        ),
-        new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1", shardId1, "12")),
-        null,
-        null
-    );
-
-    // unexpected range on partition 2 (kill)
-    Task id3 = createKinesisIndexTask(
-        "id3",
-        DATASOURCE,
-        1,
-        new SeekableStreamStartSequenceNumbers<>(
-            stream,
-            ImmutableMap.of(shardId0, "0", shardId1, "1"), ImmutableSet.of(shardId0, shardId1)
-        ),
-        new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1", shardId1, "11")),
-        null,
-        null
-    );
-
-    // different datasource (don't kill)
-    Task id4 = createKinesisIndexTask(
-        "id4",
         "other-datasource",
         2,
         new SeekableStreamStartSequenceNumbers<>(
@@ -656,8 +625,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
     );
 
     // non KinesisIndexTask (don't kill)
-    Task id5 = new RealtimeIndexTask(
-        "id5",
+    Task id2 = new RealtimeIndexTask(
+        "id2",
         null,
         new FireDepartment(
             dataSchema,
@@ -667,18 +636,12 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null
     );
 
-    List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4, id5);
+    List<Task> existingTasks = ImmutableList.of(id1, id2);
 
     EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
     EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
-    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
-    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
-    EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
-    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
-    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
-    EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
     EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
             .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
             .anyTimes();
@@ -690,24 +653,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
             null
         )
     ).anyTimes();
-    EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
-    EasyMock.expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(false));
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
-    taskQueue.shutdown("id3", "Task [%s] failed to stop in a timely manner, killing task", "id3");
-
-    EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true);
-
-    TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
-    checkpoints.put(0, ImmutableMap.of(
-        shardId0,
-        "0",
-        shardId1,
-        "0"
-    ));
 
-    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
-            .andReturn(Futures.immediateFuture(checkpoints))
-            .times(2);
+    EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2);
 
     replayAll();
     supervisor.start();
@@ -718,7 +666,14 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testKillBadPartitionAssignment() throws Exception
   {
-    supervisor = getSupervisor(1, 2, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(
+        1,
+        2,
+        true,
+        "PT1H",
+        null,
+        null
+    );
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -771,17 +726,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null
     );
-    Task id5 = createKinesisIndexTask(
-        "id5",
-        DATASOURCE,
-        0,
-        new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of(shardId0)),
-        new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
-        null,
-        null
-    );
 
-    List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4, id5);
+    List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4);
 
     EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
@@ -791,12 +737,10 @@ public class KinesisSupervisorTest extends EasyMockSupport
     EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
     EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
     EasyMock.expect(taskStorage.getStatus("id4")).andReturn(Optional.of(TaskStatus.running("id4"))).anyTimes();
-    EasyMock.expect(taskStorage.getStatus("id5")).andReturn(Optional.of(TaskStatus.running("id5"))).anyTimes();
     EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
     EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
     EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
     EasyMock.expect(taskStorage.getTask("id4")).andReturn(Optional.of(id4)).anyTimes();
-    EasyMock.expect(taskStorage.getTask("id5")).andReturn(Optional.of(id5)).anyTimes();
     EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
             .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
             .anyTimes();
@@ -809,24 +753,26 @@ public class KinesisSupervisorTest extends EasyMockSupport
         )
     ).anyTimes();
     EasyMock.expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(true));
-    EasyMock.expect(taskClient.stopAsync("id4", false)).andReturn(Futures.immediateFuture(false));
-    EasyMock.expect(taskClient.stopAsync("id5", false)).andReturn(Futures.immediateFuture((Boolean) null));
 
     TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
     checkpoints1.put(0, ImmutableMap.of(shardId1, "0"));
     TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(shardId0, "0"));
+    TreeMap<Integer, Map<String, String>> checkpoints4 = new TreeMap<>();
+    checkpoints4.put(0, ImmutableMap.of(shardId0, "0"));
     EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
             .andReturn(Futures.immediateFuture(checkpoints1))
             .times(1);
     EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
             .andReturn(Futures.immediateFuture(checkpoints2))
             .times(1);
+    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id4"), EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints4))
+            .times(1);
 
 
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
-    taskQueue.shutdown("id4", "Task [%s] failed to stop in a timely manner, killing task", "id4");
-    taskQueue.shutdown("id5", "Task [%s] failed to stop in a timely manner, killing task", "id5");
+    taskQueue.shutdown("id4", "Task [%s] failed to return status, killing task", "id4");
     replayAll();
 
     supervisor.start();
@@ -837,7 +783,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testRequeueTaskWhenFailed() throws Exception
   {
-    supervisor = getSupervisor(2, 2, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
     expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -946,7 +892,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testRequeueAdoptedTaskWhenFailed() throws Exception
   {
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
     expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -965,7 +911,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "stream",
+            ImmutableMap.of(shardId1, "0", shardId0, "0"),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "stream",
             ImmutableMap.of(
@@ -1076,7 +1026,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testQueueNextTasksOnSuccess() throws Exception
   {
-    supervisor = getSupervisor(2, 2, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
     expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -1197,7 +1147,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(2, 2, true, "PT1M", null, null);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -1333,7 +1283,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -1352,7 +1302,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "stream",
+            ImmutableMap.of(shardId1, "0", shardId0, "0"),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "stream",
             ImmutableMap.of(
@@ -1482,7 +1436,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
     expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -1500,7 +1454,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "stream",
+            ImmutableMap.of(shardId1, "0", shardId0, "0"),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "stream",
             ImmutableMap.of(
@@ -1621,7 +1579,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -1791,7 +1749,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
   {
-    supervisor = getSupervisor(2, 2, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
     expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -1870,7 +1828,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(2, 2, true, "PT1M", null, null);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
     expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -1975,7 +1933,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   {
     final TaskLocation location = new TaskLocation("testHost", 1234, -1);
 
-    supervisor = getSupervisor(2, 2, true, "PT1M", null, null);
+    supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
     expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -2091,7 +2049,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test(expected = IllegalStateException.class)
   public void testStopNotStarted()
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
     supervisor.stop(false);
   }
 
@@ -2106,7 +2064,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     taskRunner.unregisterListener(StringUtils.format("KinesisSupervisor-%s", DATASOURCE));
     replayAll();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
     supervisor.start();
     supervisor.stop(false);
 
@@ -2120,7 +2078,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
     expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -2138,7 +2096,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "stream",
+            ImmutableMap.of(shardId1, "0", shardId0, "0"),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "stream",
             ImmutableMap.of(
@@ -2296,7 +2258,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
     replayAll();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
 
 
     supervisor.start();
@@ -2316,7 +2278,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   public void testResetDataSourceMetadata() throws Exception
   {
     expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes();
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
     EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@@ -2393,7 +2355,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   public void testResetNoDataSourceMetadata() throws Exception
   {
     expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes();
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
     EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
@@ -2429,7 +2391,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
     expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -2447,7 +2409,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "stream",
+            ImmutableMap.of(shardId1, "0", shardId0, "0"),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "stream",
             ImmutableMap.of(
@@ -2580,7 +2546,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   public void testNoDataIngestionTasks() throws Exception
   {
     final DateTime startTime = DateTimes.nowUtc();
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
 
     //not adding any events
     Task id1 = createKinesisIndexTask(
@@ -2725,7 +2691,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   public void testCheckpointForInactiveTaskGroup()
       throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
   {
-    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
     //not adding any events
     final Task id1;
     id1 = createKinesisIndexTask(
@@ -2863,7 +2829,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
         new KinesisDataSourceMetadata(
             new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), checkpoints.get(0).keySet())
         ),
-        new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, fakeCheckpoints, ImmutableSet.of()))
+        new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+            stream,
+            fakeCheckpoints,
+            ImmutableSet.of()
+        ))
     );
 
     while (supervisor.getNoticesQueueSize() > 0) {
@@ -2881,7 +2851,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   public void testCheckpointForUnknownTaskGroup()
       throws InterruptedException
   {
-    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
 
 
     supervisorRecordSupplier.assign(anyObject());
@@ -2985,8 +2955,16 @@ public class KinesisSupervisorTest extends EasyMockSupport
     supervisor.checkpoint(
         0,
         ((KinesisIndexTask) id1).getIOConfig().getBaseSequenceName(),
-        new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, Collections.emptyMap(), ImmutableSet.of())),
-        new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, Collections.emptyMap(), ImmutableSet.of()))
+        new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+            stream,
+            Collections.emptyMap(),
+            ImmutableSet.of()
+        )),
+        new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+            stream,
+            Collections.emptyMap(),
+            ImmutableSet.of()
+        ))
     );
 
     while (supervisor.getNoticesQueueSize() > 0) {
@@ -3012,7 +2990,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   public void testCheckpointWithNullTaskGroupId()
       throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
   {
-    supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
+    supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null, false);
     //not adding any events
     final Task id1 = createKinesisIndexTask(
         "id1",
@@ -3101,8 +3079,16 @@ public class KinesisSupervisorTest extends EasyMockSupport
     supervisor.checkpoint(
         null,
         ((KinesisIndexTask) id1).getIOConfig().getBaseSequenceName(),
-        new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), ImmutableSet.of())),
-        new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, newCheckpoints.get(0), ImmutableSet.of()))
+        new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+            stream,
+            checkpoints.get(0),
+            ImmutableSet.of()
+        )),
+        new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+            stream,
+            newCheckpoints.get(0),
+            ImmutableSet.of()
+        ))
     );
 
     while (supervisor.getNoticesQueueSize() > 0) {
@@ -3116,7 +3102,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testSuspendedNoRunningTasks() throws Exception
   {
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true);
 
     expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes();
     EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -3149,7 +3135,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
     final DateTime startTime = DateTimes.nowUtc();
 
-    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, true);
+    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true);
 
     supervisorRecordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -3169,7 +3155,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "stream",
+            ImmutableMap.of(shardId1, "0", shardId0, "0"),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "stream",
             ImmutableMap.of(
@@ -3314,7 +3304,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
     replayAll();
 
-    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true);
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true);
     supervisor.start();
     supervisor.runInternal();
     verifyAll();
@@ -3331,7 +3321,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   @Test
   public void testGetCurrentTotalStats()
   {
-    supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false);
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "0"),
@@ -3372,81 +3362,393 @@ public class KinesisSupervisorTest extends EasyMockSupport
     Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", "val2")), stats.get("1"));
   }
 
-  private KinesisSupervisor getSupervisor(
-      int replicas,
-      int taskCount,
-      boolean useEarliestOffset,
-      String duration,
-      Period lateMessageRejectionPeriod,
-      Period earlyMessageRejectionPeriod
-  )
+  @Test
+  public void testDoNotKillCompatibleTasks()
+      throws InterruptedException, EntryExistsException, ExecutionException, TimeoutException, JsonProcessingException
   {
-    return getSupervisor(
-        replicas,
-        taskCount,
-        useEarliestOffset,
-        duration,
-        lateMessageRejectionPeriod,
-        earlyMessageRejectionPeriod,
+    // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks
+    int numReplicas = 2;
+    supervisor = getTestableSupervisorCustomIsTaskCurrent(
+        numReplicas,
+        1,
+        true,
+        "PT1H",
+        new Period("P1D"),
+        new Period("P1D"),
         false,
+        42,
+        1000,
+        true
+    );
+
+    supervisorRecordSupplier.assign(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(stream))
+            .andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getAssignment())
+            .andReturn(ImmutableSet.of(shard1Partition, shard0Partition))
+            .anyTimes();
+    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
+    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
+    EasyMock.expectLastCall().anyTimes();
+
+    Task task = createKinesisIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            stream,
+            ImmutableMap.of(
+                shardId0,
+                "0",
+                shardId1,
+                "0"
+            ),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(
+            shardId0,
+            "1",
+            shardId1,
+            "12"
+        )),
         null,
         null
     );
-  }
 
-  private KinesisSupervisor getSupervisor(
-      int replicas,
-      int taskCount,
-      boolean useEarliestOffset,
-      String duration,
-      Period lateMessageRejectionPeriod,
-      Period earlyMessageRejectionPeriod,
-      boolean suspended,
-      Integer recordsPerFetch,
-      Integer fetchDelayMillis
+    List<Task> existingTasks = ImmutableList.of(task);
 
-  )
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
+    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(task)).anyTimes();
+    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+            .anyTimes();
+    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+            .anyTimes();
+    EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+        new KinesisDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+    EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true);
+
+    TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, ImmutableMap.of(
+        shardId0,
+        "0",
+        shardId1,
+        "0"
+    ));
+
+    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints))
+            .times(numReplicas);
+
+    replayAll();
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+  }
+
+  @Test
+  public void testKillIncompatibleTasks()
+      throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException, EntryExistsException
   {
-    KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
-        stream,
-        "awsEndpoint",
-        null,
-        replicas,
-        taskCount,
-        new Period(duration),
+    // This supervisor always returns false for isTaskCurrent -> it should kill its tasks
+    int numReplicas = 2;
+    supervisor = getTestableSupervisorCustomIsTaskCurrent(
+        numReplicas,
+        1,
+        true,
+        "PT1H",
         new Period("P1D"),
-        new Period("PT30S"),
-        useEarliestOffset,
-        new Period("PT30M"),
-        lateMessageRejectionPeriod,
-        earlyMessageRejectionPeriod,
-        recordsPerFetch,
-        fetchDelayMillis,
-        null,
-        null,
+        new Period("P1D"),
+        false,
+        42,
+        1000,
         false
     );
+    supervisorRecordSupplier.assign(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(stream))
+            .andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getAssignment())
+            .andReturn(ImmutableSet.of(shard1Partition, shard0Partition))
+            .anyTimes();
+    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
+    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
+    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
+    EasyMock.expectLastCall().anyTimes();
 
-    KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
+    Task task = createKinesisIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            stream,
+            ImmutableMap.of(
+                shardId0,
+                "0",
+                shardId1,
+                "0"
+            ),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(
+            shardId0,
+            "1",
+            shardId1,
+            "12"
+        )),
         null,
         null
-    )
-    {
-      @Override
-      public KinesisIndexTaskClient build(
-          TaskInfoProvider taskInfoProvider,
-          String dataSource,
-          int numThreads,
-          Duration httpTimeout,
-          long numRetries
-      )
-      {
-        Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
-        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
-        Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
-        return taskClient;
-      }
-    };
+    );
+
+    List<Task> existingTasks = ImmutableList.of(task);
+
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
+    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
+    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+            .anyTimes();
+    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+            .anyTimes();
+    EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+        new KinesisDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+    EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+    EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2);
+
+    replayAll();
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+  }
+
+  @Test
+  public void testIsTaskCurrent()
+  {
+    DateTime minMessageTime = DateTimes.nowUtc();
+    DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
+
+    KinesisSupervisor supervisor = getSupervisor(
+        1,
+        1,
+        true,
+        "PT1H",
+        new Period("P1D"),
+        new Period("P1D"),
+        false,
+        42,
+        42,
+        dataSchema,
+        tuningConfig
+    );
+
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        42,
+        ImmutableMap.of(shardId1, "3"),
+        Optional.of(minMessageTime),
+        Optional.of(maxMessageTime),
+        ImmutableSet.of("id1", "id2", "id3", "id4"),
+        ImmutableSet.of()
+    );
+
+    DataSchema modifiedDataSchema = getDataSchema("some other datasource");
+
+    KinesisSupervisorTuningConfig modifiedTuningConfig = new KinesisSupervisorTuningConfig(
+        1000,
+        null,
+        50000,
+        null,
+        new Period("P1Y"),
+        new File("/test"),
+        null,
+        null,
+        true,
+        false,
+        null,
+        null,
+        null,
+        null,
+        numThreads,
+        TEST_CHAT_THREADS,
+        TEST_CHAT_RETRIES,
+        TEST_HTTP_TIMEOUT,
+        TEST_SHUTDOWN_TIMEOUT,
+        null,
+        null,
+        null,
+        5000,
+        null,
+        null,
+        null,
+        null,
+        42, // This property is different from tuningConfig
+        null
+    );
+
+    KinesisIndexTask taskFromStorage = createKinesisIndexTask(
+        "id1",
+        0,
+        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(
+            shardId1,
+            "3"
+        ), ImmutableSet.of()),
+        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
+            shardId1,
+            KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+        )),
+        minMessageTime,
+        maxMessageTime,
+        dataSchema
+    );
+
+    KinesisIndexTask taskFromStorageMismatchedDataSchema = createKinesisIndexTask(
+        "id2",
+        0,
+        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(
+            shardId1,
+            "3"
+        ), ImmutableSet.of()),
+        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
+            shardId1,
+            KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+        )),
+        minMessageTime,
+        maxMessageTime,
+        modifiedDataSchema
+    );
+
+    KinesisIndexTask taskFromStorageMismatchedTuningConfig = createKinesisIndexTask(
+        "id3",
+        0,
+        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(
+            shardId1,
+            "3"
+        ), ImmutableSet.of()),
+        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
+            shardId1,
+            KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+        )),
+        minMessageTime,
+        maxMessageTime,
+        dataSchema,
+        modifiedTuningConfig
+    );
+
+    KinesisIndexTask taskFromStorageMismatchedPartitionsWithTaskGroup = createKinesisIndexTask(
+        "id4",
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "stream",
+            ImmutableMap.of(
+                shardId1,
+                "4" // this is the mismatch
+            ),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
+            shardId1,
+            KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+        )),
+        minMessageTime,
+        maxMessageTime,
+        dataSchema
+    );
+
+    EasyMock.expect(taskStorage.getTask("id1"))
+            .andReturn(Optional.of(taskFromStorage))
+            .once();
+    EasyMock.expect(taskStorage.getTask("id2"))
+            .andReturn(Optional.of(taskFromStorageMismatchedDataSchema))
+            .once();
+    EasyMock.expect(taskStorage.getTask("id3"))
+            .andReturn(Optional.of(taskFromStorageMismatchedTuningConfig))
+            .once();
+    EasyMock.expect(taskStorage.getTask("id4"))
+            .andReturn(Optional.of(taskFromStorageMismatchedPartitionsWithTaskGroup))
+            .once();
+
+    replayAll();
+
+    Assert.assertTrue(supervisor.isTaskCurrent(42, "id1"));
+    Assert.assertFalse(supervisor.isTaskCurrent(42, "id2"));
+    Assert.assertFalse(supervisor.isTaskCurrent(42, "id3"));
+    Assert.assertFalse(supervisor.isTaskCurrent(42, "id4"));
+    verifyAll();
+  }
+
+  private KinesisSupervisor getTestableSupervisor(
+      int replicas,
+      int taskCount,
+      boolean useEarliestOffset,
+      String duration,
+      Period lateMessageRejectionPeriod,
+      Period earlyMessageRejectionPeriod,
+      boolean suspended
+  )
+  {
+    KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
+        stream,
+        "awsEndpoint",
+        null,
+        replicas,
+        taskCount,
+        new Period(duration),
+        new Period("P1D"),
+        new Period("PT30S"),
+        useEarliestOffset,
+        new Period("PT30M"),
+        lateMessageRejectionPeriod,
+        earlyMessageRejectionPeriod,
+        null,
+        null,
+        null,
+        null,
+        false
+    );
+
+    KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
+        null,
+        null
+    )
+    {
+      @Override
+      public KinesisIndexTaskClient build(
+          TaskInfoProvider taskInfoProvider,
+          String dataSource,
+          int numThreads,
+          Duration httpTimeout,
+          long numRetries
+      )
+      {
+        Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+        Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+        return taskClient;
+      }
+    };
 
     return new TestableKinesisSupervisor(
         taskStorage,
@@ -3474,105 +3776,194 @@ public class KinesisSupervisorTest extends EasyMockSupport
     );
   }
 
-  private static DataSchema getDataSchema(String dataSource)
+  private KinesisSupervisor getTestableSupervisor(
+      int replicas,
+      int taskCount,
+      boolean useEarliestOffset,
+      String duration,
+      Period lateMessageRejectionPeriod,
+      Period earlyMessageRejectionPeriod
+  )
   {
-    List<DimensionSchema> dimensions = new ArrayList<>();
-    dimensions.add(StringDimensionSchema.create("dim1"));
-    dimensions.add(StringDimensionSchema.create("dim2"));
-
-    return new DataSchema(
-        dataSource,
-        objectMapper.convertValue(
-            new StringInputRowParser(
-                new JSONParseSpec(
-                    new TimestampSpec("timestamp", "iso", null),
-                    new DimensionsSpec(
-                        dimensions,
-                        null,
-                        null
-                    ),
-                    new JSONPathSpec(true, ImmutableList.of()),
-                    ImmutableMap.of()
-                ),
-                StandardCharsets.UTF_8.name()
-            ),
-            Map.class
-        ),
-        new AggregatorFactory[]{new CountAggregatorFactory("rows")},
-        new UniformGranularitySpec(
-            Granularities.HOUR,
-            Granularities.NONE,
-            ImmutableList.of()
-        ),
+    return getTestableSupervisor(
+        replicas,
+        taskCount,
+        useEarliestOffset,
+        duration,
+        lateMessageRejectionPeriod,
+        earlyMessageRejectionPeriod,
+        false,
         null,
-        objectMapper
+        null
     );
   }
 
-  private static List<byte[]> JB(
-      String timestamp,
-      String dim1,
-      String dim2,
-      String dimLong,
-      String dimFloat,
-      String met1
+  private KinesisSupervisor getTestableSupervisor(
+      int replicas,
+      int taskCount,
+      boolean useEarliestOffset,
+      String duration,
+      Period lateMessageRejectionPeriod,
+      Period earlyMessageRejectionPeriod,
+      boolean suspended,
+      Integer recordsPerFetch,
+      Integer fetchDelayMillis
   )
   {
-    try {
-      return Collections.singletonList(new ObjectMapper().writeValueAsBytes(
-          ImmutableMap.builder()
-                      .put("timestamp", timestamp)
-                      .put("dim1", dim1)
-                      .put("dim2", dim2)
-                      .put("dimLong", dimLong)
-                      .put("dimFloat", dimFloat)
-                      .put("met1", met1)
-                      .build()
-      ));
-    }
-    catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
+    KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
+        stream,
+        "awsEndpoint",
+        null,
+        replicas,
+        taskCount,
+        new Period(duration),
+        new Period("P1D"),
+        new Period("PT30S"),
+        useEarliestOffset,
+        new Period("PT30M"),
+        lateMessageRejectionPeriod,
+        earlyMessageRejectionPeriod,
+        recordsPerFetch,
+        fetchDelayMillis,
+        null,
+        null,
+        false
+    );
+
+    KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
+        null,
+        null
+    )
+    {
+      @Override
+      public KinesisIndexTaskClient build(
+          TaskInfoProvider taskInfoProvider,
+          String dataSource,
+          int numThreads,
+          Duration httpTimeout,
+          long numRetries
+      )
+      {
+        Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+        Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+        return taskClient;
+      }
+    };
+
+    return new TestableKinesisSupervisor(
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        objectMapper,
+        new KinesisSupervisorSpec(
+            dataSchema,
+            tuningConfig,
+            KinesisSupervisorIOConfig,
+            null,
+            suspended,
+            taskStorage,
+            taskMaster,
+            indexerMetadataStorageCoordinator,
+            taskClientFactory,
+            objectMapper,
+            new NoopServiceEmitter(),
+            new DruidMonitorSchedulerConfig(),
+            rowIngestionMetersFactory,
+            null
+        ),
+        rowIngestionMetersFactory
+    );
   }
 
-  private KinesisIndexTask createKinesisIndexTask(
-      String id,
-      String dataSource,
-      int taskGroupId,
-      SeekableStreamStartSequenceNumbers<String, String> startPartitions,
-      SeekableStreamEndSequenceNumbers<String, String> endPartitions,
-      DateTime minimumMessageTime,
-      DateTime maximumMessageTime
+  /**
+   * Use when you want to mock the return value of SeekableStreamSupervisor#isTaskCurrent()
+   */
+  private KinesisSupervisor getTestableSupervisorCustomIsTaskCurrent(
+      int replicas,
+      int taskCount,
+      boolean useEarliestOffset,
+      String duration,
+      Period lateMessageRejectionPeriod,
+      Period earlyMessageRejectionPeriod,
+      boolean suspended,
+      Integer recordsPerFetch,
+      Integer fetchDelayMillis,
+      boolean isTaskCurrentReturn
   )
   {
-    return new KinesisIndexTask(
-        id,
+    KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
+        stream,
+        "awsEndpoint",
         null,
-        getDataSchema(dataSource),
-        tuningConfig,
-        new KinesisIndexTaskIOConfig(
-            null,
-            "sequenceName-" + taskGroupId,
-            startPartitions,
-            endPartitions,
-            true,
-            minimumMessageTime,
-            maximumMessageTime,
-            "awsEndpoint",
-            null,
-            null,
-            null,
-            null,
-            false
-        ),
-        Collections.emptyMap(),
+        replicas,
+        taskCount,
+        new Period(duration),
+        new Period("P1D"),
+        new Period("PT30S"),
+        useEarliestOffset,
+        new Period("PT30M"),
+        lateMessageRejectionPeriod,
+        earlyMessageRejectionPeriod,
+        recordsPerFetch,
+        fetchDelayMillis,
         null,
         null,
-        rowIngestionMetersFactory,
+        false
+    );
+
+    KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
+        null,
         null
+    )
+    {
+      @Override
+      public KinesisIndexTaskClient build(
+          TaskInfoProvider taskInfoProvider,
+          String dataSource,
+          int numThreads,
+          Duration httpTimeout,
+          long numRetries
+      )
+      {
+        Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+        Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+        return taskClient;
+      }
+    };
+
+    return new TestableKinesisSupervisorWithCustomIsTaskCurrent(
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        objectMapper,
+        new KinesisSupervisorSpec(
+            dataSchema,
+            tuningConfig,
+            KinesisSupervisorIOConfig,
+            null,
+            suspended,
+            taskStorage,
+            taskMaster,
+            indexerMetadataStorageCoordinator,
+            taskClientFactory,
+            objectMapper,
+            new NoopServiceEmitter(),
+            new DruidMonitorSchedulerConfig(),
+            rowIngestionMetersFactory,
+            null
+        ),
+        rowIngestionMetersFactory,
+        isTaskCurrentReturn
     );
   }
 
+  /**
+   * Use for tests where you don't want generateSequenceName to be overridden out
+   */
   private KinesisSupervisor getSupervisor(
       int replicas,
       int taskCount,
@@ -3580,7 +3971,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
       String duration,
       Period lateMessageRejectionPeriod,
       Period earlyMessageRejectionPeriod,
-      boolean suspended
+      boolean suspended,
+      Integer recordsPerFetch,
+      Integer fetchDelayMillis,
+      DataSchema dataSchema,
+      KinesisSupervisorTuningConfig tuningConfig
   )
   {
     KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
@@ -3596,8 +3991,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         new Period("PT30M"),
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
-        null,
-        null,
+        recordsPerFetch,
+        fetchDelayMillis,
         null,
         null,
         false
@@ -3624,7 +4019,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
       }
     };
 
-    return new TestableKinesisSupervisor(
+    return new KinesisSupervisor(
         taskStorage,
         taskMaster,
         indexerMetadataStorageCoordinator,
@@ -3646,7 +4041,126 @@ public class KinesisSupervisorTest extends EasyMockSupport
             rowIngestionMetersFactory,
             null
         ),
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        null
+    );
+  }
+
+  private static DataSchema getDataSchema(String dataSource)
+  {
+    List<DimensionSchema> dimensions = new ArrayList<>();
+    dimensions.add(StringDimensionSchema.create("dim1"));
+    dimensions.add(StringDimensionSchema.create("dim2"));
+
+    return new DataSchema(
+        dataSource,
+        objectMapper.convertValue(
+            new StringInputRowParser(
+                new JSONParseSpec(
+                    new TimestampSpec("timestamp", "iso", null),
+                    new DimensionsSpec(
+                        dimensions,
+                        null,
+                        null
+                    ),
+                    new JSONPathSpec(true, ImmutableList.of()),
+                    ImmutableMap.of()
+                ),
+                StandardCharsets.UTF_8.name()
+            ),
+            Map.class
+        ),
+        new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+        new UniformGranularitySpec(
+            Granularities.HOUR,
+            Granularities.NONE,
+            ImmutableList.of()
+        ),
+        null,
+        objectMapper
+    );
+  }
+
+
+  private KinesisIndexTask createKinesisIndexTask(
+      String id,
+      String dataSource,
+      int taskGroupId,
+      SeekableStreamStartSequenceNumbers<String, String> startPartitions,
+      SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+      DateTime minimumMessageTime,
+      DateTime maximumMessageTime
+  )
+  {
+    return createKinesisIndexTask(
+        id,
+        taskGroupId,
+        startPartitions,
+        endPartitions,
+        minimumMessageTime,
+        maximumMessageTime,
+        getDataSchema(dataSource)
+    );
+  }
+
+  private KinesisIndexTask createKinesisIndexTask(
+      String id,
+      int taskGroupId,
+      SeekableStreamStartSequenceNumbers<String, String> startPartitions,
+      SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+      DateTime minimumMessageTime,
+      DateTime maximumMessageTime,
+      DataSchema dataSchema
+  )
+  {
+    return createKinesisIndexTask(
+        id,
+        taskGroupId,
+        startPartitions,
+        endPartitions,
+        minimumMessageTime,
+        maximumMessageTime,
+        dataSchema,
+        (KinesisIndexTaskTuningConfig) tuningConfig.convertToTaskTuningConfig()
+    );
+  }
+
+  private KinesisIndexTask createKinesisIndexTask(
+      String id,
+      int taskGroupId,
+      SeekableStreamStartSequenceNumbers<String, String> startPartitions,
+      SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+      DateTime minimumMessageTime,
+      DateTime maximumMessageTime,
+      DataSchema dataSchema,
+      KinesisIndexTaskTuningConfig tuningConfig
+  )
+  {
+    return new KinesisIndexTask(
+        id,
+        null,
+        dataSchema,
+        tuningConfig,
+        new KinesisIndexTaskIOConfig(
+            0,
+            "sequenceName-" + taskGroupId,
+            startPartitions,
+            endPartitions,
+            true,
+            minimumMessageTime,
+            maximumMessageTime,
+            "awsEndpoint",
+            null,
+            null,
+            null,
+            null,
+            false
+        ),
+        Collections.emptyMap(),
+        null,
+        null,
+        rowIngestionMetersFactory,
+        null
     );
   }
 
@@ -3715,7 +4229,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
     protected String generateSequenceName(
         Map<String, String> startPartitions,
         Optional<DateTime> minimumMessageTime,
-        Optional<DateTime> maximumMessageTime
+        Optional<DateTime> maximumMessageTime,
+        DataSchema dataSchema,
+        SeekableStreamIndexTaskTuningConfig tuningConfig
     )
     {
       final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next());
@@ -3727,7 +4243,39 @@ public class KinesisSupervisorTest extends EasyMockSupport
     {
       return supervisorRecordSupplier;
     }
+  }
 
+  private class TestableKinesisSupervisorWithCustomIsTaskCurrent extends TestableKinesisSupervisor
+  {
+    private boolean isTaskCurrentReturn;
 
+    public TestableKinesisSupervisorWithCustomIsTaskCurrent(
+        TaskStorage taskStorage,
+        TaskMaster taskMaster,
+        IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
+        KinesisIndexTaskClientFactory taskClientFactory,
+        ObjectMapper mapper,
+        KinesisSupervisorSpec spec,
+        RowIngestionMetersFactory rowIngestionMetersFactory,
+        boolean isTaskCurrentReturn
+    )
+    {
+      super(
+          taskStorage,
+          taskMaster,
+          indexerMetadataStorageCoordinator,
+          taskClientFactory,
+          mapper,
+          spec,
+          rowIngestionMetersFactory
+      );
+      this.isTaskCurrentReturn = isTaskCurrentReturn;
+    }
+
+    @Override
+    public boolean isTaskCurrent(int taskGroupId, String taskId)
+    {
+      return isTaskCurrentReturn;
+    }
   }
 }
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
new file mode 100644
index 0000000..2485e97
--- /dev/null
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kinesis.test;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+@JsonTypeName("KinesisTuningConfig")
+public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTuningConfig
+{
+  private final String extra;
+
+  @JsonCreator
+  public TestModifiedKinesisIndexTaskTuningConfig(
+      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+      @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
+      @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
+      @JsonProperty("maxTotalRows") Long maxTotalRows,
+      @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
+      @JsonProperty("basePersistDirectory") File basePersistDirectory,
+      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
+      @JsonProperty("indexSpec") IndexSpec indexSpec,
+      @JsonProperty("buildV9Directly") Boolean buildV9Directly,
+      @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
+      @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
+      @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
+      @JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean skipSequenceNumberAvailabilityCheck,
+      @JsonProperty("recordBufferSize") Integer recordBufferSize,
+      @JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout,
+      @JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
+      @JsonProperty("fetchSequenceNumberTimeout") Integer fetchSequenceNumberTimeout,
+      @JsonProperty("fetchThreads") Integer fetchThreads,
+      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+      @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+      @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
+      @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
+      @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
+      @JsonProperty("extra") String extra
+  )
+  {
+    super(
+        maxRowsInMemory,
+        maxBytesInMemory,
+        maxRowsPerSegment,
+        maxTotalRows,
+        intermediatePersistPeriod,
+        basePersistDirectory,
+        maxPendingPersists,
+        indexSpec,
+        buildV9Directly,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        resetOffsetAutomatically,
+        skipSequenceNumberAvailabilityCheck,
+        recordBufferSize,
+        recordBufferOfferTimeout,
+        recordBufferFullWait,
+        fetchSequenceNumberTimeout,
+        fetchThreads,
+        segmentWriteOutMediumFactory,
+        logParseExceptions,
+        maxParseExceptions,
+        maxSavedParseExceptions,
+        maxRecordsPerPoll,
+        intermediateHandoffPeriod
+    );
+    this.extra = extra;
+  }
+
+  public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig base, String extra)
+  {
+    super(
+        base.getMaxRowsInMemory(),
+        base.getMaxBytesInMemory(),
+        base.getMaxRowsPerSegment(),
+        base.getMaxTotalRows(),
+        base.getIntermediatePersistPeriod(),
+        base.getBasePersistDirectory(),
+        base.getMaxPendingPersists(),
+        base.getIndexSpec(),
+        base.getBuildV9Directly(),
+        base.isReportParseExceptions(),
+        base.getHandoffConditionTimeout(),
+        base.isResetOffsetAutomatically(),
+        base.isSkipSequenceNumberAvailabilityCheck(),
+        base.getRecordBufferSize(),
+        base.getRecordBufferOfferTimeout(),
+        base.getRecordBufferFullWait(),
+        base.getFetchSequenceNumberTimeout(),
+        base.getFetchThreads(),
+        base.getSegmentWriteOutMediumFactory(),
+        base.isLogParseExceptions(),
+        base.getMaxParseExceptions(),
+        base.getMaxSavedParseExceptions(),
+        base.getMaxRecordsPerPoll(),
+        base.getIntermediateHandoffPeriod()
+    );
+    this.extra = extra;
+  }
+
+  @JsonProperty("extra")
+  public String getExtra()
+  {
+    return extra;
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f904e85..10f97ea 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -74,6 +74,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.segment.indexing.DataSchema;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -168,6 +169,31 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         @Nullable Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
     )
     {
+      this(
+          groupId,
+          startingSequences,
+          minimumMessageTime,
+          maximumMessageTime,
+          exclusiveStartSequenceNumberPartitions,
+          generateSequenceName(
+              startingSequences,
+              minimumMessageTime,
+              maximumMessageTime,
+              spec.getDataSchema(),
+              taskTuningConfig
+          )
+      );
+    }
+
+    TaskGroup(
+        int groupId,
+        ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
+        Optional<DateTime> minimumMessageTime,
+        Optional<DateTime> maximumMessageTime,
+        Set<PartitionIdType> exclusiveStartSequenceNumberPartitions,
+        String baseSequenceName
+    )
+    {
       this.groupId = groupId;
       this.startingSequences = startingSequences;
       this.minimumMessageTime = minimumMessageTime;
@@ -176,7 +202,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions != null
                                                     ? exclusiveStartSequenceNumberPartitions
                                                     : Collections.emptySet();
-      this.baseSequenceName = generateSequenceName(startingSequences, minimumMessageTime, maximumMessageTime);
+      this.baseSequenceName = baseSequenceName;
     }
 
     int addNewCheckpoint(Map<PartitionIdType, SequenceOffsetType> checkpoint)
@@ -1154,12 +1180,18 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           }
         }
         if (metadataUpdateSuccess) {
-          resetMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().keySet().forEach(partition -> {
-            final int groupId = getTaskGroupIdForPartition(partition);
-            killTaskGroupForPartitions(ImmutableSet.of(partition), "DataSourceMetadata is updated while reset");
-            activelyReadingTaskGroups.remove(groupId);
-            partitionGroups.get(groupId).replaceAll((partitionId, sequence) -> getNotSetMarker());
-          });
+          resetMetadata.getSeekableStreamSequenceNumbers()
+                       .getPartitionSequenceNumberMap()
+                       .keySet()
+                       .forEach(partition -> {
+                         final int groupId = getTaskGroupIdForPartition(partition);
+                         killTaskGroupForPartitions(
+                             ImmutableSet.of(partition),
+                             "DataSourceMetadata is updated while reset"
+                         );
+                         activelyReadingTaskGroups.remove(groupId);
+                         partitionGroups.get(groupId).replaceAll((partitionId, sequence) -> getNotSetMarker());
+                       });
         } else {
           throw new ISE("Unable to reset metadata");
         }
@@ -1257,7 +1289,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           futureTaskIds.add(taskId);
           futures.add(
               Futures.transform(
-                  taskClient.getStatusAsync(taskId), new Function<SeekableStreamIndexTaskRunner.Status, Boolean>()
+                  taskClient.getStatusAsync(taskId),
+                  new Function<SeekableStreamIndexTaskRunner.Status, Boolean>()
                   {
                     @Override
                     public Boolean apply(SeekableStreamIndexTaskRunner.Status status)
@@ -1271,7 +1304,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                                  .keySet()
                                                  .forEach(
                                                      partition -> addDiscoveredTaskToPendingCompletionTaskGroups(
-                                                         getTaskGroupIdForPartition(partition),
+                                                         getTaskGroupIdForPartition(
+                                                             partition),
                                                          taskId,
                                                          seekableStreamIndexTask.getIOConfig()
                                                                                 .getStartSequenceNumbers()
@@ -1295,7 +1329,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                               succeeded = true;
                               SequenceOffsetType previousOffset = partitionOffsets.putIfAbsent(partition, sequence);
                               if (previousOffset != null
-                                  && (makeSequenceNumber(previousOffset).compareTo(makeSequenceNumber(sequence))) < 0) {
+                                  && (makeSequenceNumber(previousOffset)
+                                  .compareTo(makeSequenceNumber(
+                                      sequence))) < 0) {
                                 succeeded = partitionOffsets.replace(partition, previousOffset, sequence);
                               }
                             } while (!succeeded);
@@ -1311,7 +1347,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                   taskId
                               );
                               try {
-                                stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
+                                stopTask(taskId, false)
+                                    .get(futureTimeoutInSeconds, TimeUnit.SECONDS);
                               }
                               catch (InterruptedException | ExecutionException | TimeoutException e) {
                                 log.warn(e, "Exception while stopping task");
@@ -1327,7 +1364,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                 taskId
                             );
                             try {
-                              stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
+                              stopTask(taskId, false)
+                                  .get(futureTimeoutInSeconds, TimeUnit.SECONDS);
                             }
                             catch (InterruptedException | ExecutionException | TimeoutException e) {
                               log.warn(e, "Exception while stopping task");
@@ -1338,6 +1376,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                 taskGroupId,
                                 k -> {
                                   log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
+                                  // We reassign the task's original base sequence name (from the existing task) to the
+                                  // task group so that the replica segment allocations are the same.
                                   return new TaskGroup(
                                       taskGroupId,
                                       ImmutableMap.copyOf(
@@ -1349,7 +1389,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                       seekableStreamIndexTask.getIOConfig().getMaximumMessageTime(),
                                       seekableStreamIndexTask.getIOConfig()
                                                              .getStartSequenceNumbers()
-                                                             .getExclusivePartitions()
+                                                             .getExclusivePartitions(),
+                                      seekableStreamIndexTask.getIOConfig().getBaseSequenceName()
                                   );
                                 }
                             );
@@ -1362,6 +1403,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                   taskId
                               );
                             }
+                            verifySameSequenceNameForAllTasksInGroup(taskGroupId);
                           }
                         }
                         return true;
@@ -1371,7 +1413,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                         return null;
                       }
                     }
-
                   }, workerExec
               )
           );
@@ -1379,7 +1420,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       }
     }
 
-
     List<Boolean> results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
     for (int i = 0; i < results.size(); i++) {
       if (results.get(i) == null) {
@@ -1424,10 +1464,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     final List<String> taskIds = new ArrayList<>();
 
     for (String taskId : taskGroup.taskIds()) {
-      final ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> checkpointsFuture = taskClient.getCheckpointsAsync(
-          taskId,
-          true
-      );
+      final ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> checkpointsFuture =
+          taskClient.getCheckpointsAsync(
+              taskId,
+              true
+          );
       futures.add(checkpointsFuture);
       taskIds.add(taskId);
     }
@@ -1612,6 +1653,34 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     taskGroupList.add(newTaskGroup);
   }
 
+  // Sanity check to ensure that tasks have the same sequence name as their task group
+  private void verifySameSequenceNameForAllTasksInGroup(int groupId)
+  {
+    String taskGroupSequenceName = activelyReadingTaskGroups.get(groupId).baseSequenceName;
+    boolean allSequenceNamesMatch =
+        activelyReadingTaskGroups.get(groupId)
+            .tasks
+            .keySet()
+            .stream()
+            .map(x -> {
+              Optional<Task> taskOptional = taskStorage.getTask(x);
+              if (!taskOptional.isPresent() || !doesTaskTypeMatchSupervisor(taskOptional.get())) {
+                return false;
+              }
+              @SuppressWarnings("unchecked")
+              SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task =
+                  (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional.get();
+              return task.getIOConfig().getBaseSequenceName();
+            })
+            .allMatch(taskSeqName -> taskSeqName.equals(taskGroupSequenceName));
+    if (!allSequenceNamesMatch) {
+      throw new ISE(
+          "Base sequence names do not match for the tasks in the task group with ID [%s]",
+          groupId
+      );
+    }
+  }
+
   private ListenableFuture<Void> stopTask(final String id, final boolean publish)
   {
     return Futures.transform(
@@ -1631,7 +1700,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     );
   }
 
-  private boolean isTaskCurrent(int taskGroupId, String taskId)
+  @VisibleForTesting
+  public boolean isTaskCurrent(int taskGroupId, String taskId)
   {
     Optional<Task> taskOptional = taskStorage.getTask(taskId);
     if (!taskOptional.isPresent() || !doesTaskTypeMatchSupervisor(taskOptional.get())) {
@@ -1639,22 +1709,39 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
 
     @SuppressWarnings("unchecked")
-    SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task = (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional
-        .get();
+    SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task =
+        (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional.get();
+
+    // We recompute the sequence name hash for the supervisor's own configuration and compare this to the hash created
+    // by rehashing the task's sequence name using the most up-to-date class definitions of tuning config and
+    // data schema. Recomputing both ensures that forwards-compatible tasks won't be killed (which would occur
+    // if the hash generated using the old class definitions was used).
+    String taskSequenceName = generateSequenceName(
+        task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+        task.getIOConfig().getMinimumMessageTime(),
+        task.getIOConfig().getMaximumMessageTime(),
+        task.getDataSchema(),
+        task.getTuningConfig()
+    );
 
-    String taskSequenceName = task.getIOConfig().getBaseSequenceName();
     if (activelyReadingTaskGroups.get(taskGroupId) != null) {
-      return Preconditions
-          .checkNotNull(activelyReadingTaskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId)
-          .baseSequenceName
-          .equals(taskSequenceName);
+      TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId);
+      return generateSequenceName(
+          taskGroup.startingSequences,
+          taskGroup.minimumMessageTime,
+          taskGroup.maximumMessageTime,
+          spec.getDataSchema(),
+          taskTuningConfig
+      ).equals(taskSequenceName);
     } else {
       return generateSequenceName(
           task.getIOConfig()
               .getStartSequenceNumbers()
               .getPartitionSequenceNumberMap(),
           task.getIOConfig().getMinimumMessageTime(),
-          task.getIOConfig().getMaximumMessageTime()
+          task.getIOConfig().getMaximumMessageTime(),
+          spec.getDataSchema(),
+          taskTuningConfig
       ).equals(taskSequenceName);
     }
   }
@@ -1663,7 +1750,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   protected String generateSequenceName(
       Map<PartitionIdType, SequenceOffsetType> startPartitions,
       Optional<DateTime> minimumMessageTime,
-      Optional<DateTime> maximumMessageTime
+      Optional<DateTime> maximumMessageTime,
+      DataSchema dataSchema,
+      SeekableStreamIndexTaskTuningConfig tuningConfig
   )
   {
     StringBuilder sb = new StringBuilder();
@@ -1676,17 +1765,17 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     String minMsgTimeStr = (minimumMessageTime.isPresent() ? String.valueOf(minimumMessageTime.get().getMillis()) : "");
     String maxMsgTimeStr = (maximumMessageTime.isPresent() ? String.valueOf(maximumMessageTime.get().getMillis()) : "");
 
-    String dataSchema, tuningConfig;
+    String dataSchemaStr, tuningConfigStr;
     try {
-      dataSchema = sortingMapper.writeValueAsString(spec.getDataSchema());
-      tuningConfig = sortingMapper.writeValueAsString(taskTuningConfig);
+      dataSchemaStr = sortingMapper.writeValueAsString(dataSchema);
+      tuningConfigStr = sortingMapper.writeValueAsString(tuningConfig);
     }
     catch (JsonProcessingException e) {
       throw Throwables.propagate(e);
     }
 
-    String hashCode = DigestUtils.sha1Hex(dataSchema
-                                          + tuningConfig
+    String hashCode = DigestUtils.sha1Hex(dataSchemaStr
+                                          + tuningConfigStr
                                           + partitionOffsetStr
                                           + minMsgTimeStr
                                           + maxMsgTimeStr)
@@ -2283,7 +2372,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                 exclusiveStartSequenceNumberPartitions
             )
         );
-
       }
     }
 
@@ -2468,8 +2556,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         .get(groupId)
         .exclusiveStartSequenceNumberPartitions;
 
-    DateTime minimumMessageTime = activelyReadingTaskGroups.get(groupId).minimumMessageTime.orNull();
-    DateTime maximumMessageTime = activelyReadingTaskGroups.get(groupId).maximumMessageTime.orNull();
+    DateTime minimumMessageTime = group.minimumMessageTime.orNull();
+    DateTime maximumMessageTime = group.maximumMessageTime.orNull();
 
     SeekableStreamIndexTaskIOConfig newIoConfig = createTaskIoConfig(
         groupId,
@@ -2482,7 +2570,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         ioConfig
     );
 
-
     List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>> taskList = createIndexTasks(
         replicas,
         group.baseSequenceName,
@@ -2705,7 +2792,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * calculates the taskgroup id that the given partition belongs to.
    * different between Kafka/Kinesis since Kinesis uses String as partition id
    *
-   * @param partition paritition id
+   * @param partition partition id
    *
    * @return taskgroup id
    */
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index 6c5c5d4..58a2c4b 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -48,6 +48,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -375,4 +376,82 @@ public class DataSchemaTest
         )
     );
   }
+
+  @Test
+  public void testSerdeWithUpdatedDataSchemaAddedField() throws IOException
+  {
+    Map<String, Object> parser = jsonMapper.convertValue(
+        new StringInputRowParser(
+            new JSONParseSpec(
+                new TimestampSpec("time", "auto", null),
+                new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null),
+                null,
+                null
+            ),
+            null
+        ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+
+    DataSchema originalSchema = new DataSchema(
+        "test",
+        parser,
+        new AggregatorFactory[]{
+            new DoubleSumAggregatorFactory("metric1", "col1"),
+            new DoubleSumAggregatorFactory("metric2", "col2"),
+            },
+        new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
+        null,
+        jsonMapper
+    );
+
+    String serialized = jsonMapper.writeValueAsString(originalSchema);
+    TestModifiedDataSchema deserialized = jsonMapper.readValue(serialized, TestModifiedDataSchema.class);
+
+    Assert.assertEquals(null, deserialized.getExtra());
+    Assert.assertEquals(originalSchema.getDataSource(), deserialized.getDataSource());
+    Assert.assertEquals(originalSchema.getGranularitySpec(), deserialized.getGranularitySpec());
+    Assert.assertEquals(originalSchema.getParser().getParseSpec(), deserialized.getParser().getParseSpec());
+    Assert.assertArrayEquals(originalSchema.getAggregators(), deserialized.getAggregators());
+    Assert.assertEquals(originalSchema.getTransformSpec(), deserialized.getTransformSpec());
+    Assert.assertEquals(originalSchema.getParserMap(), deserialized.getParserMap());
+  }
+
+  @Test
+  public void testSerdeWithUpdatedDataSchemaRemovedField() throws IOException
+  {
+    Map<String, Object> parser = jsonMapper.convertValue(
+        new StringInputRowParser(
+            new JSONParseSpec(
+                new TimestampSpec("time", "auto", null),
+                new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null),
+                null,
+                null
+            ),
+            null
+        ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+
+    TestModifiedDataSchema originalSchema = new TestModifiedDataSchema(
+        "test",
+        parser,
+        new AggregatorFactory[]{
+            new DoubleSumAggregatorFactory("metric1", "col1"),
+            new DoubleSumAggregatorFactory("metric2", "col2"),
+            },
+        new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
+        null,
+        jsonMapper,
+        "some arbitrary string"
+    );
+
+    String serialized = jsonMapper.writeValueAsString(originalSchema);
+    DataSchema deserialized = jsonMapper.readValue(serialized, DataSchema.class);
+
+    Assert.assertEquals(originalSchema.getDataSource(), deserialized.getDataSource());
+    Assert.assertEquals(originalSchema.getGranularitySpec(), deserialized.getGranularitySpec());
+    Assert.assertEquals(originalSchema.getParser().getParseSpec(), deserialized.getParser().getParseSpec());
+    Assert.assertArrayEquals(originalSchema.getAggregators(), deserialized.getAggregators());
+    Assert.assertEquals(originalSchema.getTransformSpec(), deserialized.getTransformSpec());
+    Assert.assertEquals(originalSchema.getParserMap(), deserialized.getParserMap());
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java b/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java
new file mode 100644
index 0000000..ca030fe
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.transform.TransformSpec;
+
+import java.util.Map;
+
+public class TestModifiedDataSchema extends DataSchema
+{
+  private final String extra;
+
+  @JsonCreator
+  public TestModifiedDataSchema(
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("parser") Map<String, Object> parser,
+      @JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
+      @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
+      @JsonProperty("transformSpec") TransformSpec transformSpec,
+      @JacksonInject ObjectMapper jsonMapper,
+      @JsonProperty("extra") String extra
+  )
+  {
+    super(dataSource, parser, aggregators, granularitySpec, transformSpec, jsonMapper);
+    this.extra = extra;
+  }
+
+  @JsonProperty("extra")
+  public String getExtra()
+  {
+    return extra;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org