You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/03/27 00:16:35 UTC

[incubator-druid] branch 0.14.0-incubating updated: Fix exclusive start partitions for sequenceMetadata (#7339) (#7347)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new 1a29d4e  Fix exclusive start partitions for sequenceMetadata (#7339) (#7347)
1a29d4e is described below

commit 1a29d4e88349a002ec8356e64281a661d47271f1
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Tue Mar 26 17:16:27 2019 -0700

    Fix exclusive start partitions for sequenceMetadata (#7339) (#7347)
    
    * Fix exclusvie start partitions for sequenceMetadata
    
    * add empty check
---
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  25 ++--
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  76 ++++--------
 .../kinesis/supervisor/KinesisSupervisor.java      |  25 ++--
 .../indexing/kinesis/KinesisIndexTaskTest.java     | 117 +++++++++++++++---
 .../actions/SegmentTransactionalInsertAction.java  |   5 +-
 .../SeekableStreamIndexTaskRunner.java             | 136 +++++++++++++++------
 .../indexing/seekablestream/SequenceMetadata.java  |   7 +-
 .../supervisor/SeekableStreamSupervisor.java       |  25 +++-
 .../supervisor/SeekableStreamSupervisorSpec.java   |   5 +-
 .../IndexerSQLMetadataStorageCoordinator.java      |   7 +-
 10 files changed, 272 insertions(+), 156 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 86639fe..bbc1639 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
@@ -80,6 +79,11 @@ import java.util.stream.Collectors;
  */
 public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
 {
+  public static final TypeReference<TreeMap<Integer, Map<Integer, Long>>> CHECKPOINTS_TYPE_REF =
+      new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
+      {
+      };
+
   private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
   private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
   private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
@@ -230,20 +234,9 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
       RowIngestionMetersFactory rowIngestionMetersFactory
   ) throws JsonProcessingException
   {
-    final String checkpoints = sortingMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
-    {
-    }).writeValueAsString(sequenceOffsets);
-    final Map<String, Object> context = spec.getContext() == null
-                                        ? ImmutableMap.of(
-        "checkpoints",
-        checkpoints,
-        IS_INCREMENTAL_HANDOFF_SUPPORTED,
-        true
-    ) : ImmutableMap.<String, Object>builder()
-                                            .put("checkpoints", checkpoints)
-                                            .put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
-                                            .putAll(spec.getContext())
-                                            .build();
+    final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
+    final Map<String, Object> context = createBaseTaskContexts();
+    context.put(CHECKPOINTS_CTX_KEY, checkpoints);
 
     List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
     for (int i = 0; i < replicas; i++) {
@@ -359,7 +352,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
   }
 
   @Override
-  protected boolean useExclusiveStartSequenceNumberForStartSequence()
+  protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
   {
     return false;
   }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 5a3f7dd..f11599f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.kafka;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -68,6 +69,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskTest;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.test.TestBroker;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -2217,9 +2219,10 @@ public class KafkaIndexTaskTest
     // and this task should start reading from offset 2 for partition 0
     sequences.put(1, ImmutableMap.of(0, 2L));
     final Map<String, Object> context = new HashMap<>();
-    context.put("checkpoints", objectMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
-    {
-    }).writeValueAsString(sequences));
+    context.put(
+        SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY,
+        objectMapper.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
+    );
 
     final KafkaIndexTask task = createTask(
         null,
@@ -2404,7 +2407,7 @@ public class KafkaIndexTaskTest
   private KafkaIndexTask createTask(
       final String taskId,
       final KafkaIndexTaskIOConfig ioConfig
-  )
+  ) throws JsonProcessingException
   {
     return createTask(taskId, DATA_SCHEMA, ioConfig);
   }
@@ -2413,7 +2416,7 @@ public class KafkaIndexTaskTest
       final String taskId,
       final KafkaIndexTaskIOConfig ioConfig,
       final Map<String, Object> context
-  )
+  ) throws JsonProcessingException
   {
     return createTask(taskId, DATA_SCHEMA, ioConfig, context);
   }
@@ -2422,62 +2425,24 @@ public class KafkaIndexTaskTest
       final String taskId,
       final DataSchema dataSchema,
       final KafkaIndexTaskIOConfig ioConfig
-  )
+  ) throws JsonProcessingException
   {
-    final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig(
-        1000,
-        null,
-        maxRowsPerSegment,
-        maxTotalRows,
-        new Period("P1Y"),
-        null,
-        null,
-        null,
-        true,
-        reportParseExceptions,
-        handoffConditionTimeout,
-        resetOffsetAutomatically,
-        null,
-        intermediateHandoffPeriod,
-        logParseExceptions,
-        maxParseExceptions,
-        maxSavedParseExceptions
-    );
-    final Map<String, Object> context = isIncrementalHandoffSupported
-                                        ? ImmutableMap.of(
-        SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED,
-        true
-    )
-                                        : null;
-    final KafkaIndexTask task = new KafkaIndexTask(
-        taskId,
-        null,
-        cloneDataSchema(dataSchema),
-        tuningConfig,
-        ioConfig,
-        context,
-        null,
-        null,
-        rowIngestionMetersFactory,
-        objectMapper
-    );
-    task.setPollRetryMs(POLL_RETRY_MS);
-    return task;
+    final Map<String, Object> context = new HashMap<>();
+    return createTask(taskId, dataSchema, ioConfig, context);
   }
 
-
   private KafkaIndexTask createTask(
       final String taskId,
       final DataSchema dataSchema,
       final KafkaIndexTaskIOConfig ioConfig,
       final Map<String, Object> context
-  )
+  ) throws JsonProcessingException
   {
     final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig(
         1000,
         null,
         maxRowsPerSegment,
-        null,
+        maxTotalRows,
         new Period("P1Y"),
         null,
         null,
@@ -2487,13 +2452,22 @@ public class KafkaIndexTaskTest
         handoffConditionTimeout,
         resetOffsetAutomatically,
         null,
-        null,
+        intermediateHandoffPeriod,
         logParseExceptions,
         maxParseExceptions,
         maxSavedParseExceptions
     );
     if (isIncrementalHandoffSupported) {
       context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
+
+      if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
+        final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+        checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
+        final String checkpointsJson = objectMapper
+            .writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
+            .writeValueAsString(checkpoints);
+        context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
+      }
     }
 
     final KafkaIndexTask task = new KafkaIndexTask(
@@ -2562,8 +2536,8 @@ public class KafkaIndexTaskTest
       objectMapper.registerModule(module);
     }
     final TaskConfig taskConfig = new TaskConfig(
-        new File(directory, "taskBaseDir").getPath(),
-        null,
+        new File(directory, "baseDir").getPath(),
+        new File(directory, "baseTaskDir").getPath(),
         null,
         50000,
         null,
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 332e693..870d7ec 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -75,6 +75,11 @@ import java.util.concurrent.ScheduledExecutorService;
  */
 public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
 {
+  public static final TypeReference<TreeMap<Integer, Map<String, String>>> CHECKPOINTS_TYPE_REF =
+      new TypeReference<TreeMap<Integer, Map<String, String>>>()
+      {
+      };
+
   private static final String NOT_SET = "-1";
   private final KinesisSupervisorSpec spec;
   private final AWSCredentialsConfig awsCredentialsConfig;
@@ -151,20 +156,10 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
       RowIngestionMetersFactory rowIngestionMetersFactory
   ) throws JsonProcessingException
   {
-    final String checkpoints = sortingMapper.writerFor(new TypeReference<TreeMap<Integer, Map<String, String>>>()
-    {
-    }).writeValueAsString(sequenceOffsets);
-    final Map<String, Object> context = spec.getContext() == null
-                                        ? ImmutableMap.of(
-        "checkpoints",
-        checkpoints,
-        IS_INCREMENTAL_HANDOFF_SUPPORTED,
-        true
-    ) : ImmutableMap.<String, Object>builder()
-                                            .put("checkpoints", checkpoints)
-                                            .put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
-                                            .putAll(spec.getContext())
-                                            .build();
+    final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
+    final Map<String, Object> context = createBaseTaskContexts();
+    context.put(CHECKPOINTS_CTX_KEY, checkpoints);
+
     List<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<>();
     for (int i = 0; i < replicas; i++) {
       String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId());
@@ -313,7 +308,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
   }
 
   @Override
-  protected boolean useExclusiveStartSequenceNumberForStartSequence()
+  protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
   {
     return true;
   }
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 127b3d9..0fbf59a 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.Module;
@@ -74,6 +75,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskTest;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.MetadataTaskStorage;
@@ -83,6 +85,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SequenceMetadata;
 import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
@@ -181,6 +184,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -512,7 +516,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc2));
   }
 
-
   @Test(timeout = 120_000L)
   public void testIncrementalHandOff() throws Exception
   {
@@ -739,7 +742,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
                 DATA_SCHEMA.getDataSource(),
                 0,
                 new KinesisDataSourceMetadata(
-                    new SeekableStreamStartSequenceNumbers<>(stream, currentOffsets, ImmutableSet.of())
+                    new SeekableStreamStartSequenceNumbers<>(stream, currentOffsets, currentOffsets.keySet())
                 ),
                 new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(stream, nextOffsets))
             )
@@ -1952,7 +1955,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         new KinesisIndexTaskIOConfig(
             null,
             "sequence0",
-            new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of(shardId1)),
+            new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
             true,
             null,
@@ -2050,10 +2053,9 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         )
     );
 
-    final SeekableStreamStartSequenceNumbers<String, String> checkpoint1 = new SeekableStreamStartSequenceNumbers<>(
+    final SeekableStreamEndSequenceNumbers<String, String> checkpoint1 = new SeekableStreamEndSequenceNumbers<>(
         stream,
-        ImmutableMap.of(shardId1, "4"),
-        ImmutableSet.of()
+        ImmutableMap.of(shardId1, "4")
     );
 
     final ListenableFuture<TaskStatus> future1 = runTask(task1);
@@ -2098,7 +2100,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         new KinesisIndexTaskIOConfig(
             null,
             "sequence0",
-            new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of(shardId1)),
+            new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")),
             true,
             null,
@@ -2274,9 +2276,10 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     // and this task should start reading from offset 2 for partition 0 (not offset 1, because end is inclusive)
     sequences.put(1, ImmutableMap.of(shardId1, "1"));
     final Map<String, Object> context = new HashMap<>();
-    context.put("checkpoints", objectMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<String, String>>>()
-    {
-    }).writeValueAsString(sequences));
+    context.put(
+        SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY,
+        objectMapper.writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
+    );
 
 
     final KinesisIndexTask task = createTask(
@@ -2476,6 +2479,75 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     );
   }
 
+  @Test
+  public void testSequencesFromContext() throws IOException
+  {
+    final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
+    // Here the sequence number is 1 meaning that one incremental handoff was done by the failed task
+    // and this task should start reading from offset 2 for partition 0 (not offset 1, because end is inclusive)
+    checkpoints.put(0, ImmutableMap.of(shardId0, "0", shardId1, "0"));
+    checkpoints.put(1, ImmutableMap.of(shardId0, "0", shardId1, "1"));
+    checkpoints.put(2, ImmutableMap.of(shardId0, "1", shardId1, "3"));
+    final Map<String, Object> context = new HashMap<>();
+    context.put("checkpoints", objectMapper.writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF)
+                                           .writeValueAsString(checkpoints));
+
+    final KinesisIndexTask task = createTask(
+        "task1",
+        DATA_SCHEMA,
+        new KinesisIndexTaskIOConfig(
+            null,
+            "sequence0",
+            new SeekableStreamStartSequenceNumbers<>(
+                stream,
+                ImmutableMap.of(shardId0, "0", shardId1, "0"),
+                ImmutableSet.of(shardId0)
+            ),
+            new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1", shardId1, "5")),
+            true,
+            null,
+            null,
+            "awsEndpoint",
+            null,
+            null,
+            null,
+            null,
+            false
+        ),
+        context
+    );
+
+    task.getRunner().setToolbox(toolboxFactory.build(task));
+    task.getRunner().initializeSequences();
+    final CopyOnWriteArrayList<SequenceMetadata<String, String>> sequences = task.getRunner().getSequences();
+
+    Assert.assertEquals(3, sequences.size());
+
+    SequenceMetadata<String, String> sequenceMetadata = sequences.get(0);
+    Assert.assertEquals(checkpoints.get(0), sequenceMetadata.getStartOffsets());
+    Assert.assertEquals(checkpoints.get(1), sequenceMetadata.getEndOffsets());
+    Assert.assertEquals(
+        task.getIOConfig().getStartSequenceNumbers().getExclusivePartitions(),
+        sequenceMetadata.getExclusiveStartPartitions()
+    );
+    Assert.assertTrue(sequenceMetadata.isCheckpointed());
+
+    sequenceMetadata = sequences.get(1);
+    Assert.assertEquals(checkpoints.get(1), sequenceMetadata.getStartOffsets());
+    Assert.assertEquals(checkpoints.get(2), sequenceMetadata.getEndOffsets());
+    Assert.assertEquals(checkpoints.get(1).keySet(), sequenceMetadata.getExclusiveStartPartitions());
+    Assert.assertTrue(sequenceMetadata.isCheckpointed());
+
+    sequenceMetadata = sequences.get(2);
+    Assert.assertEquals(checkpoints.get(2), sequenceMetadata.getStartOffsets());
+    Assert.assertEquals(
+        task.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap(),
+        sequenceMetadata.getEndOffsets()
+    );
+    Assert.assertEquals(checkpoints.get(2).keySet(), sequenceMetadata.getExclusiveStartPartitions());
+    Assert.assertFalse(sequenceMetadata.isCheckpointed());
+  }
+
   private ListenableFuture<TaskStatus> runTask(final Task task)
   {
     try {
@@ -2525,7 +2597,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
   private KinesisIndexTask createTask(
       final String taskId,
       final KinesisIndexTaskIOConfig ioConfig
-  )
+  ) throws JsonProcessingException
   {
     return createTask(taskId, DATA_SCHEMA, ioConfig, null);
   }
@@ -2534,7 +2606,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
       final String taskId,
       final DataSchema dataSchema,
       final KinesisIndexTaskIOConfig ioConfig
-  )
+  ) throws JsonProcessingException
   {
     return createTask(taskId, dataSchema, ioConfig, null);
   }
@@ -2544,7 +2616,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
       final DataSchema dataSchema,
       final KinesisIndexTaskIOConfig ioConfig,
       @Nullable final Map<String, Object> context
-  )
+  ) throws JsonProcessingException
   {
     final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
         maxRowsInMemory,
@@ -2581,13 +2653,22 @@ public class KinesisIndexTaskTest extends EasyMockSupport
       final KinesisIndexTaskIOConfig ioConfig,
       final KinesisIndexTaskTuningConfig tuningConfig,
       @Nullable final Map<String, Object> context
-  )
+  ) throws JsonProcessingException
   {
     if (context != null) {
       context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
+
+      if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
+        final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
+        checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
+        final String checkpointsJson = objectMapper
+            .writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF)
+            .writeValueAsString(checkpoints);
+        context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
+      }
     }
 
-    final KinesisIndexTask task = new TestableKinesisIndexTask(
+    return new TestableKinesisIndexTask(
         taskId,
         null,
         cloneDataSchema(dataSchema),
@@ -2599,8 +2680,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         rowIngestionMetersFactory,
         null
     );
-
-    return task;
   }
 
   private static DataSchema cloneDataSchema(final DataSchema dataSchema)
@@ -2660,8 +2739,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
       objectMapper.registerModule(module);
     }
     final TaskConfig taskConfig = new TaskConfig(
-        new File(directory, "taskBaseDir").getPath(),
-        null,
+        new File(directory, "baseDir").getPath(),
+        new File(directory, "baseTaskDir").getPath(),
         null,
         50000,
         null,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index 8a3c713..3a0aa76 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.CriticalAction;
@@ -154,8 +153,8 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
   @Override
   public String toString()
   {
-    return "SegmentInsertAction{" +
-           "segments=" + Iterables.transform(segments, DataSegment::getId) +
+    return "SegmentTransactionalInsertAction{" +
+           "segments=" + segments +
            ", startMetadata=" + startMetadata +
            ", endMetadata=" + endMetadata +
            '}';
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 3907d54..c07118a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -59,6 +59,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecor
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -239,7 +240,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     resetNextCheckpointTime();
   }
 
-
   public TaskStatus run(TaskToolbox toolbox)
   {
     try {
@@ -256,57 +256,90 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     }
   }
 
-  private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
+  private Set<PartitionIdType> computeExclusiveStartPartitionsForSequence(
+      Map<PartitionIdType, SequenceOffsetType> sequenceStartOffsets
+  )
+  {
+    if (sequenceStartOffsets.equals(ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap())) {
+      return ioConfig.getStartSequenceNumbers().getExclusivePartitions();
+    } else {
+      return isEndOffsetExclusive() ? Collections.emptySet() : sequenceStartOffsets.keySet();
+    }
+  }
+
+  @VisibleForTesting
+  public void setToolbox(TaskToolbox toolbox)
   {
-    log.info("SeekableStream indexing task starting up!");
-    startTime = DateTimes.nowUtc();
-    status = Status.STARTING;
     this.toolbox = toolbox;
+  }
 
+  @VisibleForTesting
+  public void initializeSequences() throws IOException
+  {
     if (!restoreSequences()) {
       final TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> checkpoints = getCheckPointsFromContext(
           toolbox,
-          task.getContextValue("checkpoints")
+          task.getContextValue(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)
       );
       if (checkpoints != null) {
-        boolean exclusive = false;
         Iterator<Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>>> sequenceOffsets = checkpoints.entrySet()
                                                                                                             .iterator();
         Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> previous = sequenceOffsets.next();
         while (sequenceOffsets.hasNext()) {
           Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> current = sequenceOffsets.next();
-          addSequence(new SequenceMetadata<>(
-              previous.getKey(),
-              StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
-              previous.getValue(),
-              current.getValue(),
-              true,
-              exclusive ? previous.getValue().keySet() : null
-          ));
+          final Set<PartitionIdType> exclusiveStartPartitions = computeExclusiveStartPartitionsForSequence(
+              previous.getValue()
+          );
+          addSequence(
+              new SequenceMetadata<>(
+                  previous.getKey(),
+                  StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
+                  previous.getValue(),
+                  current.getValue(),
+                  true,
+                  exclusiveStartPartitions
+              )
+          );
           previous = current;
-          exclusive = true;
         }
-        addSequence(new SequenceMetadata<>(
-            previous.getKey(),
-            StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
-            previous.getValue(),
-            endOffsets,
-            false,
-            exclusive ? previous.getValue().keySet() : null
-        ));
+        final Set<PartitionIdType> exclusiveStartPartitions = computeExclusiveStartPartitionsForSequence(
+            previous.getValue()
+        );
+        addSequence(
+            new SequenceMetadata<>(
+                previous.getKey(),
+                StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
+                previous.getValue(),
+                endOffsets,
+                false,
+                exclusiveStartPartitions
+            )
+        );
       } else {
-        addSequence(new SequenceMetadata<>(
-            0,
-            StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
-            ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
-            endOffsets,
-            false,
-            null
-        ));
+        addSequence(
+            new SequenceMetadata<>(
+                0,
+                StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
+                ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+                endOffsets,
+                false,
+                ioConfig.getStartSequenceNumbers().getExclusivePartitions()
+            )
+        );
       }
     }
 
     log.info("Starting with sequences:  %s", sequences);
+  }
+
+  private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
+  {
+    log.info("SeekableStream indexing task starting up!");
+    startTime = DateTimes.nowUtc();
+    status = Status.STARTING;
+
+    setToolbox(toolbox);
+    initializeSequences();
 
     if (chatHandlerProvider.isPresent()) {
       log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
@@ -392,10 +425,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         }
         // sequences size can be 0 only when all sequences got published and task stopped before it could finish
         // which is super rare
-        if (sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) {
+        if (sequences.size() == 0 || getLastSequenceMetadata().isCheckpointed()) {
           this.endOffsets.putAll(sequences.size() == 0
                                  ? currOffsets
-                                 : sequences.get(sequences.size() - 1).getEndOffsets());
+                                 : getLastSequenceMetadata().getEndOffsets());
           log.info("End sequences changed to [%s]", endOffsets);
         }
       }
@@ -479,7 +512,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           }
 
           // if stop is requested or task's end sequence is set by call to setEndOffsets method with finish set to true
-          if (stopRequested.get() || sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) {
+          if (stopRequested.get() || sequences.size() == 0 || getLastSequenceMetadata().isCheckpointed()) {
             status = Status.PUBLISHING;
           }
 
@@ -628,12 +661,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           }
 
           if (System.currentTimeMillis() > nextCheckpointTime) {
-            sequenceToCheckpoint = sequences.get(sequences.size() - 1);
+            sequenceToCheckpoint = getLastSequenceMetadata();
           }
 
           if (sequenceToCheckpoint != null && stillReading) {
             Preconditions.checkArgument(
-                sequences.get(sequences.size() - 1)
+                getLastSequenceMetadata()
                          .getSequenceName()
                          .equals(sequenceToCheckpoint.getSequenceName()),
                 "Cannot checkpoint a sequence [%s] which is not the latest one, sequences %s",
@@ -649,7 +682,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                     new SeekableStreamStartSequenceNumbers<>(
                         stream,
                         sequenceToCheckpoint.getStartOffsets(),
-                        ioConfig.getStartSequenceNumbers().getExclusivePartitions()
+                        sequenceToCheckpoint.getExclusiveStartPartitions()
                     )
                 ),
                 createDataSourceMetadata(
@@ -1060,7 +1093,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       final SequenceOffsetType startOffset = entry.getValue();
 
       if (!sequences.isEmpty()) {
-        final SequenceOffsetType priorOffset = sequences.get(sequences.size() - 1).endOffsets.get(partition);
+        final SequenceOffsetType priorOffset = getLastSequenceMetadata().endOffsets.get(partition);
 
         if (!startOffset.equals(priorOffset)) {
           throw new ISE(
@@ -1072,9 +1105,26 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       }
     }
 
+    if (!isEndOffsetExclusive() && !sequences.isEmpty()) {
+      final SequenceMetadata<PartitionIdType, SequenceOffsetType> lastMetadata = getLastSequenceMetadata();
+      if (!lastMetadata.endOffsets.keySet().equals(sequenceMetadata.getExclusiveStartPartitions())) {
+        throw new ISE(
+            "Exclusive start partitions[%s] for new sequence don't match to the prior offset[%s]",
+            sequenceMetadata.getExclusiveStartPartitions(),
+            lastMetadata
+        );
+      }
+    }
+
     // Actually do the add.
     sequences.add(sequenceMetadata);
   }
+  
+  private SequenceMetadata<PartitionIdType, SequenceOffsetType> getLastSequenceMetadata()
+  {
+    Preconditions.checkState(!sequences.isEmpty(), "Empty sequences");
+    return sequences.get(sequences.size() - 1);
+  }
 
   /**
    * Returns true if the given record has already been read, based on lastReadOffsets.
@@ -1453,7 +1503,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         // and after acquiring pauseLock to correctly guard against duplicate requests
         Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No Sequences found to set end sequences");
 
-        final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence = sequences.get(sequences.size() - 1);
+        final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence = getLastSequenceMetadata();
         final Set<PartitionIdType> exclusiveStartPartitions;
 
         if (isEndOffsetExclusive()) {
@@ -1545,6 +1595,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     nextCheckpointTime = DateTimes.nowUtc().plus(tuningConfig.getIntermediateHandoffPeriod()).getMillis();
   }
 
+  @VisibleForTesting
+  public CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>> getSequences()
+  {
+    return sequences;
+  }
+
   @GET
   @Path("/checkpoints")
   @Produces(MediaType.APPLICATION_JSON)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index 9cbafd0..20eec35 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -216,11 +216,12 @@ public class SequenceMetadata<PartitionIdType, SequenceOffsetType>
     lock.lock();
     try {
       return "SequenceMetadata{" +
-             "sequenceName='" + sequenceName + '\'' +
-             ", sequenceId=" + sequenceId +
+             "sequenceId=" + sequenceId +
+             ", sequenceName='" + sequenceName + '\'' +
+             ", assignments=" + assignments +
              ", startOffsets=" + startOffsets +
+             ", exclusiveStartPartitions=" + exclusiveStartPartitions +
              ", endOffsets=" + endOffsets +
-             ", assignments=" + assignments +
              ", sentinel=" + sentinel +
              ", checkpointed=" + checkpointed +
              '}';
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 0b7a2c2..f904e85 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -122,6 +122,7 @@ import java.util.stream.Stream;
 public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType> implements Supervisor
 {
   public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
+  public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
 
   private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
   private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
@@ -164,7 +165,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
         Optional<DateTime> minimumMessageTime,
         Optional<DateTime> maximumMessageTime,
-        Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
+        @Nullable Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
     )
     {
       this.groupId = groupId;
@@ -174,7 +175,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       this.checkpointSequences.put(0, startingSequences);
       this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions != null
                                                     ? exclusiveStartSequenceNumberPartitions
-                                                    : new HashSet<>();
+                                                    : Collections.emptySet();
       this.baseSequenceName = generateSequenceName(startingSequences, minimumMessageTime, maximumMessageTime);
     }
 
@@ -2348,7 +2349,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       if (!getNotSetMarker().equals(sequence)) {
         // if we are given a startingOffset (set by a previous task group which is pending completion) then use it
         if (!isEndOfShard(sequence)) {
-          builder.put(partition, makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForStartSequence()));
+          builder.put(partition, makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence()));
         }
       } else {
         // if we don't have a startingOffset (first run or we had some previous failures and reset the sequences) then
@@ -2394,7 +2395,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           }
         }
       }
-      return makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForStartSequence());
+      return makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence());
     } else {
       boolean useEarliestSequenceNumber = ioConfig.isUseEarliestSequenceNumber();
       if (subsequentlyDiscoveredPartitions.contains(partition)) {
@@ -2463,7 +2464,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     for (PartitionIdType partition : startPartitions.keySet()) {
       endPartitions.put(partition, getEndOfPartitionMarker());
     }
-    Set<PartitionIdType> exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups.get(groupId).exclusiveStartSequenceNumberPartitions;
+    Set<PartitionIdType> exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups
+        .get(groupId)
+        .exclusiveStartSequenceNumberPartitions;
 
     DateTime minimumMessageTime = activelyReadingTaskGroups.get(groupId).minimumMessageTime.orNull();
     DateTime maximumMessageTime = activelyReadingTaskGroups.get(groupId).maximumMessageTime.orNull();
@@ -2654,6 +2657,16 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     );
   }
 
+  protected Map<String, Object> createBaseTaskContexts()
+  {
+    final Map<String, Object> contexts = new HashMap<>();
+    contexts.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
+    if (spec.getContext() != null) {
+      contexts.putAll(spec.getContext());
+    }
+    return contexts;
+  }
+
   /**
    * creates a specific task IOConfig instance for Kafka/Kinesis
    *
@@ -2821,5 +2834,5 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * For example, in Kinesis, the start offsets are inclusive for the first sequence, but exclusive for following
    * sequences. In Kafka, start offsets are always inclusive.
    */
-  protected abstract boolean useExclusiveStartSequenceNumberForStartSequence();
+  protected abstract boolean useExclusiveStartSequenceNumberForNonFirstSequence();
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 961e066..2eb8858 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -37,6 +37,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
@@ -51,6 +52,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
   private final DataSchema dataSchema;
   private final SeekableStreamSupervisorTuningConfig tuningConfig;
   private final SeekableStreamSupervisorIOConfig ioConfig;
+  @Nullable
   private final Map<String, Object> context;
   protected final ServiceEmitter emitter;
   protected final DruidMonitorSchedulerConfig monitorSchedulerConfig;
@@ -61,7 +63,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
       @JsonProperty("dataSchema") DataSchema dataSchema,
       @JsonProperty("tuningConfig") SeekableStreamSupervisorTuningConfig tuningConfig,
       @JsonProperty("ioConfig") SeekableStreamSupervisorIOConfig ioConfig,
-      @JsonProperty("context") Map<String, Object> context,
+      @JsonProperty("context") @Nullable Map<String, Object> context,
       @JsonProperty("suspended") Boolean suspended,
       @JacksonInject TaskStorage taskStorage,
       @JacksonInject TaskMaster taskMaster,
@@ -107,6 +109,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
     return ioConfig;
   }
 
+  @Nullable
   @JsonProperty
   public Map<String, Object> getContext()
   {
diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 421007d..22f602a 100644
--- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -892,8 +892,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
 
     if (!startMetadataMatchesExisting) {
       // Not in the desired start state.
-      log.info("Not updating metadata, existing state is not the expected start state.");
-      log.debug("Existing database state [%s], request's start metadata [%s]", oldCommitMetadataFromDb, startMetadata);
+      log.error(
+          "Not updating metadata, existing state[%s] in metadata store doesn't match to the new start state[%s].",
+          oldCommitMetadataBytesFromDb,
+          startMetadata
+      );
       return DataSourceMetadataUpdateResult.FAILURE;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org