You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2018/08/18 04:43:46 UTC

[incubator-druid] branch 0.12.3 updated: [Backport] Fix NPE for taskGroupId when rolling update (#6168) (#6188)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch 0.12.3
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.3 by this push:
     new bf1d5d7  [Backport] Fix NPE for taskGroupId when rolling update (#6168) (#6188)
bf1d5d7 is described below

commit bf1d5d7b99d3147f42a9dc91ea351aa300803dc9
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Fri Aug 17 21:43:43 2018 -0700

    [Backport] Fix NPE for taskGroupId when rolling update (#6168) (#6188)
---
 .../io/druid/indexing/kafka/KafkaIndexTask.java    |   1 +
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 110 +++++++++++++--------
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   3 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      |  91 ++++++++++++++++-
 .../CheckPointDataSourceMetadataAction.java        |  24 ++++-
 .../overlord/supervisor/SupervisorManager.java     |   5 +-
 .../overlord/supervisor/NoopSupervisorSpec.java    |   3 +-
 .../indexing/overlord/supervisor/Supervisor.java   |  10 +-
 8 files changed, 195 insertions(+), 52 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 258e203..d0590dc 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -705,6 +705,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
             final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction(
                 getDataSource(),
                 ioConfig.getTaskGroupId(),
+                getIOConfig().getBaseSequenceName(),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets))
             );
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index cb3352d..6386823 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -92,6 +92,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
@@ -143,7 +144,7 @@ public class KafkaSupervisor implements Supervisor
    * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups]
    * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]).
    */
-  private static class TaskGroup
+  private class TaskGroup
   {
     // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
@@ -157,6 +158,7 @@ public class KafkaSupervisor implements Supervisor
     final Optional<DateTime> maximumMessageTime;
     DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
     final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap<>();
+    final String baseSequenceName;
 
     TaskGroup(
         ImmutableMap<Integer, Long> partitionOffsets,
@@ -168,6 +170,7 @@ public class KafkaSupervisor implements Supervisor
       this.minimumMessageTime = minimumMessageTime;
       this.maximumMessageTime = maximumMessageTime;
       this.sequenceOffsets.put(0, partitionOffsets);
+      this.baseSequenceName = generateSequenceName(partitionOffsets, minimumMessageTime, maximumMessageTime);
     }
 
     int addNewCheckpoint(Map<Integer, Long> checkpoint)
@@ -490,25 +493,29 @@ public class KafkaSupervisor implements Supervisor
   }
 
   @Override
-  public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint)
+  public void checkpoint(
+      @Nullable Integer taskGroupId,
+      @Deprecated String baseSequenceName,
+      DataSourceMetadata previousCheckPoint,
+      DataSourceMetadata currentCheckPoint
+  )
   {
-    Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
-    Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null");
+    Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint");
+    Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot be null");
     Preconditions.checkArgument(
-        ioConfig.getTopic()
-                .equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions()
-                                                                     .getTopic()),
+        ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()),
         "Supervisor topic [%s] and topic in checkpoint [%s] does not match",
         ioConfig.getTopic(),
-        ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()
+        ((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()
     );
 
-    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId);
+    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId);
     notices.add(
         new CheckpointNotice(
             taskGroupId,
-            (KafkaDataSourceMetadata) previousCheckpoint,
-            (KafkaDataSourceMetadata) currentCheckpoint
+            baseSequenceName,
+            (KafkaDataSourceMetadata) previousCheckPoint,
+            (KafkaDataSourceMetadata) currentCheckPoint
         )
     );
   }
@@ -612,17 +619,20 @@ public class KafkaSupervisor implements Supervisor
 
   private class CheckpointNotice implements Notice
   {
-    final int taskGroupId;
-    final KafkaDataSourceMetadata previousCheckpoint;
-    final KafkaDataSourceMetadata currentCheckpoint;
+    @Nullable private final Integer nullableTaskGroupId;
+    @Deprecated private final String baseSequenceName;
+    private final KafkaDataSourceMetadata previousCheckpoint;
+    private final KafkaDataSourceMetadata currentCheckpoint;
 
     CheckpointNotice(
-        int taskGroupId,
+        @Nullable Integer nullableTaskGroupId,
+        @Deprecated String baseSequenceName,
         KafkaDataSourceMetadata previousCheckpoint,
         KafkaDataSourceMetadata currentCheckpoint
     )
     {
-      this.taskGroupId = taskGroupId;
+      this.baseSequenceName = baseSequenceName;
+      this.nullableTaskGroupId = nullableTaskGroupId;
       this.previousCheckpoint = previousCheckpoint;
       this.currentCheckpoint = currentCheckpoint;
     }
@@ -630,12 +640,44 @@ public class KafkaSupervisor implements Supervisor
     @Override
     public void handle() throws ExecutionException, InterruptedException, TimeoutException
     {
+      // Find taskGroupId using taskId if it's null. It can be null while rolling update.
+      final int taskGroupId;
+      if (nullableTaskGroupId == null) {
+        // We search taskId in taskGroups and pendingCompletionTaskGroups sequentially. This should be fine because
+        // 1) a taskGroup can be moved from taskGroups to pendingCompletionTaskGroups in RunNotice
+        //    (see checkTaskDuration()).
+        // 2) Notices are proceesed by a single thread. So, CheckpointNotice and RunNotice cannot be processed at the
+        //    same time.
+        final java.util.Optional<Integer> maybeGroupId = taskGroups
+            .entrySet()
+            .stream()
+            .filter(entry -> {
+              final TaskGroup taskGroup = entry.getValue();
+              return taskGroup.baseSequenceName.equals(baseSequenceName);
+            })
+            .findAny()
+            .map(Entry::getKey);
+        taskGroupId = maybeGroupId.orElse(
+            pendingCompletionTaskGroups
+                .entrySet()
+                .stream()
+                .filter(entry -> {
+                  final List<TaskGroup> taskGroups = entry.getValue();
+                  return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
+                })
+                .findAny()
+                .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
+                .getKey()
+        );
+      } else {
+        taskGroupId = nullableTaskGroupId;
+      }
+
       // check for consistency
       // if already received request for this sequenceName and dataSourceMetadata combination then return
-
       final TaskGroup taskGroup = taskGroups.get(taskGroupId);
 
-      if (isValidTaskGroup(taskGroup)) {
+      if (isValidTaskGroup(taskGroupId, taskGroup)) {
         final TreeMap<Integer, Map<Integer, Long>> checkpoints = taskGroup.sequenceOffsets;
 
         // check validity of previousCheckpoint
@@ -655,20 +697,13 @@ public class KafkaSupervisor implements Supervisor
           log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
           return;
         }
-        final int taskGroupId = getTaskGroupIdForPartition(
-            currentCheckpoint.getKafkaPartitions()
-                             .getPartitionOffsetMap()
-                             .keySet()
-                             .iterator()
-                             .next()
-        );
         final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
         taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
         log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
       }
     }
 
-    private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
+    private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup)
     {
       if (taskGroup == null) {
         // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
@@ -867,17 +902,6 @@ public class KafkaSupervisor implements Supervisor
     return Joiner.on("_").join("index_kafka", dataSource, hashCode);
   }
 
-  @VisibleForTesting
-  String generateSequenceName(TaskGroup taskGroup)
-  {
-    Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
-    return generateSequenceName(
-        taskGroup.partitionOffsets,
-        taskGroup.minimumMessageTime,
-        taskGroup.maximumMessageTime
-    );
-  }
-
   private static String getRandomId()
   {
     final StringBuilder suffix = new StringBuilder(8);
@@ -1748,7 +1772,6 @@ public class KafkaSupervisor implements Supervisor
       endPartitions.put(partition, Long.MAX_VALUE);
     }
     TaskGroup group = taskGroups.get(groupId);
-    String sequenceName = generateSequenceName(group);
 
     Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
     DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull();
@@ -1756,7 +1779,7 @@ public class KafkaSupervisor implements Supervisor
 
     KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
         groupId,
-        sequenceName,
+        group.baseSequenceName,
         new KafkaPartitions(ioConfig.getTopic(), startPartitions),
         new KafkaPartitions(ioConfig.getTopic(), endPartitions),
         consumerProperties,
@@ -1777,10 +1800,10 @@ public class KafkaSupervisor implements Supervisor
                                             .putAll(spec.getContext())
                                             .build();
     for (int i = 0; i < replicas; i++) {
-      String taskId = Joiner.on("_").join(sequenceName, getRandomId());
+      String taskId = Joiner.on("_").join(group.baseSequenceName, getRandomId());
       KafkaIndexTask indexTask = new KafkaIndexTask(
           taskId,
-          new TaskResource(sequenceName, 1),
+          new TaskResource(group.baseSequenceName, 1),
           spec.getDataSchema(),
           taskTuningConfig,
           kafkaIOConfig,
@@ -1909,7 +1932,10 @@ public class KafkaSupervisor implements Supervisor
 
     String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName();
     if (taskGroups.get(taskGroupId) != null) {
-      return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName);
+      return Preconditions
+          .checkNotNull(taskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId)
+          .baseSequenceName
+          .equals(taskSequenceName);
     } else {
       return generateSequenceName(
           ((KafkaIndexTask) taskOptional.get()).getIOConfig()
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 2a852d4..d89f5e2 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -1776,7 +1776,8 @@ public class KafkaIndexTaskTest
           @Override
           public boolean checkPointDataSourceMetadata(
               String supervisorId,
-              int taskGroupId,
+              @Nullable Integer taskGroupId,
+              String baseSequenceName,
               @Nullable DataSourceMetadata previousDataSourceMetadata,
               @Nullable DataSourceMetadata currentDataSourceMetadata
           )
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 2add456..5193b5b 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2103,6 +2103,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.moveTaskGroupToPendingCompletion(0);
     supervisor.checkpoint(
         0,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints))
     );
@@ -2172,6 +2173,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     supervisor.checkpoint(
         0,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap()))
     );
@@ -2190,13 +2192,100 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointWithNullTaskGroupId()
+      throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
+  {
+    supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
+    //not adding any events
+    final Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
+    expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    expect(
+        indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)
+    ).anyTimes();
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
+    expect(taskClient.getStatusAsync(anyString()))
+        .andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING))
+        .anyTimes();
+    final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, ImmutableMap.of(0, 0L));
+    expect(taskClient.getCheckpointsAsync(anyString(), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(3);
+    expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes();
+    expect(taskClient.pauseAsync(anyString()))
+        .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L)))
+        .anyTimes();
+    expect(taskClient.setEndOffsetsAsync(anyString(), EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean()))
+        .andReturn(Futures.immediateFuture(true))
+        .anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+
+    supervisor.runInternal();
+
+    final TreeMap<Integer, Map<Integer, Long>> newCheckpoints = new TreeMap<>();
+    newCheckpoints.put(0, ImmutableMap.of(0, 10L));
+    supervisor.checkpoint(
+        null,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, newCheckpoints.get(0)))
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
       for (int i = 0; i < NUM_PARTITIONS; i++) {
         for (int j = 0; j < numEventsPerPartition; j++) {
           kafkaProducer.send(
-              new ProducerRecord<byte[], byte[]>(
+              new ProducerRecord<>(
                   topic,
                   i,
                   null,
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
index 083ef9e..bc1b896 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
@@ -26,23 +26,29 @@ import io.druid.indexing.common.task.Task;
 import io.druid.indexing.overlord.DataSourceMetadata;
 
 import java.io.IOException;
+import javax.annotation.Nullable;
 
 public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
 {
   private final String supervisorId;
-  private final int taskGroupId;
+  @Nullable
+  private final Integer taskGroupId;
+  @Deprecated
+  private final String baseSequenceName;
   private final DataSourceMetadata previousCheckPoint;
   private final DataSourceMetadata currentCheckPoint;
 
   public CheckPointDataSourceMetadataAction(
       @JsonProperty("supervisorId") String supervisorId,
-      @JsonProperty("taskGroupId") Integer taskGroupId,
+      @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility,
+      @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this
       @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint,
       @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
   )
   {
     this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId");
-    this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
+    this.taskGroupId = taskGroupId;
+    this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName");
     this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint");
     this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint");
   }
@@ -53,8 +59,16 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
     return supervisorId;
   }
 
+  @Deprecated
+  @JsonProperty("sequenceName")
+  public String getBaseSequenceName()
+  {
+    return baseSequenceName;
+  }
+
+  @Nullable
   @JsonProperty
-  public int getTaskGroupId()
+  public Integer getTaskGroupId()
   {
     return taskGroupId;
   }
@@ -87,6 +101,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
     return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
         supervisorId,
         taskGroupId,
+        baseSequenceName,
         previousCheckPoint,
         currentCheckPoint
     );
@@ -103,6 +118,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
   {
     return "CheckPointDataSourceMetadataAction{" +
            "supervisorId='" + supervisorId + '\'' +
+           ", baseSequenceName='" + baseSequenceName + '\'' +
            ", taskGroupId='" + taskGroupId + '\'' +
            ", previousCheckPoint=" + previousCheckPoint +
            ", currentCheckPoint=" + currentCheckPoint +
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
index 31e54cf..cf3fe27 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -159,7 +159,8 @@ public class SupervisorManager
 
   public boolean checkPointDataSourceMetadata(
       String supervisorId,
-      int taskGroupId,
+      @Nullable Integer taskGroupId,
+      String baseSequenceName,
       DataSourceMetadata previousDataSourceMetadata,
       DataSourceMetadata currentDataSourceMetadata
   )
@@ -172,7 +173,7 @@ public class SupervisorManager
 
       Preconditions.checkNotNull(supervisor, "supervisor could not be found");
 
-      supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata);
+      supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata, currentDataSourceMetadata);
       return true;
     }
     catch (Exception e) {
diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 0ba0701..8f8105e 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -83,7 +83,8 @@ public class NoopSupervisorSpec implements SupervisorSpec
 
       @Override
       public void checkpoint(
-          int taskGroupId,
+          @Nullable Integer taskGroupId,
+          String baseSequenceName,
           DataSourceMetadata previousCheckPoint,
           DataSourceMetadata currentCheckPoint
       )
diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
index 58e73ff..3f90766 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,6 +21,8 @@ package io.druid.indexing.overlord.supervisor;
 
 import io.druid.indexing.overlord.DataSourceMetadata;
 
+import javax.annotation.Nullable;
+
 public interface Supervisor
 {
   void start();
@@ -44,8 +46,14 @@ public interface Supervisor
    * represented by {@param currentCheckpoint} DataSourceMetadata
    *
    * @param taskGroupId        unique Identifier to figure out for which sequence to do checkpointing
+   * @param baseSequenceName   baseSequenceName
    * @param previousCheckPoint DataSourceMetadata checkpointed in previous call
    * @param currentCheckPoint  current DataSourceMetadata to be checkpointed
    */
-  void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint);
+  void checkpoint(
+      @Nullable Integer taskGroupId,
+      @Deprecated String baseSequenceName,
+      DataSourceMetadata previousCheckPoint,
+      DataSourceMetadata currentCheckPoint
+  );
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org