You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/18 04:43:44 UTC

[GitHub] jon-wei closed pull request #6188: [Backport] Fix NPE for taskGroupId when rolling update (#6168)

jon-wei closed pull request #6188: [Backport] Fix NPE for taskGroupId when rolling update (#6168)
URL: https://github.com/apache/incubator-druid/pull/6188
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 5f039c0336b..c14354c0d21 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -705,6 +705,7 @@ public void onFailure(Throwable t)
             final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction(
                 getDataSource(),
                 ioConfig.getTaskGroupId(),
+                getIOConfig().getBaseSequenceName(),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets))
             );
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index cb3352d9402..63868230c3d 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -92,6 +92,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
@@ -143,7 +144,7 @@
    * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups]
    * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]).
    */
-  private static class TaskGroup
+  private class TaskGroup
   {
     // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
@@ -157,6 +158,7 @@
     final Optional<DateTime> maximumMessageTime;
     DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
     final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap<>();
+    final String baseSequenceName;
 
     TaskGroup(
         ImmutableMap<Integer, Long> partitionOffsets,
@@ -168,6 +170,7 @@
       this.minimumMessageTime = minimumMessageTime;
       this.maximumMessageTime = maximumMessageTime;
       this.sequenceOffsets.put(0, partitionOffsets);
+      this.baseSequenceName = generateSequenceName(partitionOffsets, minimumMessageTime, maximumMessageTime);
     }
 
     int addNewCheckpoint(Map<Integer, Long> checkpoint)
@@ -490,25 +493,29 @@ public void reset(DataSourceMetadata dataSourceMetadata)
   }
 
   @Override
-  public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint)
+  public void checkpoint(
+      @Nullable Integer taskGroupId,
+      @Deprecated String baseSequenceName,
+      DataSourceMetadata previousCheckPoint,
+      DataSourceMetadata currentCheckPoint
+  )
   {
-    Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
-    Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null");
+    Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint");
+    Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot be null");
     Preconditions.checkArgument(
-        ioConfig.getTopic()
-                .equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions()
-                                                                     .getTopic()),
+        ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()),
         "Supervisor topic [%s] and topic in checkpoint [%s] does not match",
         ioConfig.getTopic(),
-        ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()
+        ((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()
     );
 
-    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId);
+    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId);
     notices.add(
         new CheckpointNotice(
             taskGroupId,
-            (KafkaDataSourceMetadata) previousCheckpoint,
-            (KafkaDataSourceMetadata) currentCheckpoint
+            baseSequenceName,
+            (KafkaDataSourceMetadata) previousCheckPoint,
+            (KafkaDataSourceMetadata) currentCheckPoint
         )
     );
   }
@@ -612,17 +619,20 @@ public void handle()
 
   private class CheckpointNotice implements Notice
   {
-    final int taskGroupId;
-    final KafkaDataSourceMetadata previousCheckpoint;
-    final KafkaDataSourceMetadata currentCheckpoint;
+    @Nullable private final Integer nullableTaskGroupId;
+    @Deprecated private final String baseSequenceName;
+    private final KafkaDataSourceMetadata previousCheckpoint;
+    private final KafkaDataSourceMetadata currentCheckpoint;
 
     CheckpointNotice(
-        int taskGroupId,
+        @Nullable Integer nullableTaskGroupId,
+        @Deprecated String baseSequenceName,
         KafkaDataSourceMetadata previousCheckpoint,
         KafkaDataSourceMetadata currentCheckpoint
     )
     {
-      this.taskGroupId = taskGroupId;
+      this.baseSequenceName = baseSequenceName;
+      this.nullableTaskGroupId = nullableTaskGroupId;
       this.previousCheckpoint = previousCheckpoint;
       this.currentCheckpoint = currentCheckpoint;
     }
@@ -630,12 +640,44 @@ public void handle()
     @Override
     public void handle() throws ExecutionException, InterruptedException, TimeoutException
     {
+      // Find taskGroupId using taskId if it's null. It can be null while rolling update.
+      final int taskGroupId;
+      if (nullableTaskGroupId == null) {
+        // We search taskId in taskGroups and pendingCompletionTaskGroups sequentially. This should be fine because
+        // 1) a taskGroup can be moved from taskGroups to pendingCompletionTaskGroups in RunNotice
+        //    (see checkTaskDuration()).
+        // 2) Notices are proceesed by a single thread. So, CheckpointNotice and RunNotice cannot be processed at the
+        //    same time.
+        final java.util.Optional<Integer> maybeGroupId = taskGroups
+            .entrySet()
+            .stream()
+            .filter(entry -> {
+              final TaskGroup taskGroup = entry.getValue();
+              return taskGroup.baseSequenceName.equals(baseSequenceName);
+            })
+            .findAny()
+            .map(Entry::getKey);
+        taskGroupId = maybeGroupId.orElse(
+            pendingCompletionTaskGroups
+                .entrySet()
+                .stream()
+                .filter(entry -> {
+                  final List<TaskGroup> taskGroups = entry.getValue();
+                  return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
+                })
+                .findAny()
+                .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
+                .getKey()
+        );
+      } else {
+        taskGroupId = nullableTaskGroupId;
+      }
+
       // check for consistency
       // if already received request for this sequenceName and dataSourceMetadata combination then return
-
       final TaskGroup taskGroup = taskGroups.get(taskGroupId);
 
-      if (isValidTaskGroup(taskGroup)) {
+      if (isValidTaskGroup(taskGroupId, taskGroup)) {
         final TreeMap<Integer, Map<Integer, Long>> checkpoints = taskGroup.sequenceOffsets;
 
         // check validity of previousCheckpoint
@@ -655,20 +697,13 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc
           log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
           return;
         }
-        final int taskGroupId = getTaskGroupIdForPartition(
-            currentCheckpoint.getKafkaPartitions()
-                             .getPartitionOffsetMap()
-                             .keySet()
-                             .iterator()
-                             .next()
-        );
         final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
         taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
         log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
       }
     }
 
-    private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
+    private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup)
     {
       if (taskGroup == null) {
         // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
@@ -867,17 +902,6 @@ String generateSequenceName(
     return Joiner.on("_").join("index_kafka", dataSource, hashCode);
   }
 
-  @VisibleForTesting
-  String generateSequenceName(TaskGroup taskGroup)
-  {
-    Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
-    return generateSequenceName(
-        taskGroup.partitionOffsets,
-        taskGroup.minimumMessageTime,
-        taskGroup.maximumMessageTime
-    );
-  }
-
   private static String getRandomId()
   {
     final StringBuilder suffix = new StringBuilder(8);
@@ -1748,7 +1772,6 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc
       endPartitions.put(partition, Long.MAX_VALUE);
     }
     TaskGroup group = taskGroups.get(groupId);
-    String sequenceName = generateSequenceName(group);
 
     Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
     DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull();
@@ -1756,7 +1779,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc
 
     KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
         groupId,
-        sequenceName,
+        group.baseSequenceName,
         new KafkaPartitions(ioConfig.getTopic(), startPartitions),
         new KafkaPartitions(ioConfig.getTopic(), endPartitions),
         consumerProperties,
@@ -1777,10 +1800,10 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc
                                             .putAll(spec.getContext())
                                             .build();
     for (int i = 0; i < replicas; i++) {
-      String taskId = Joiner.on("_").join(sequenceName, getRandomId());
+      String taskId = Joiner.on("_").join(group.baseSequenceName, getRandomId());
       KafkaIndexTask indexTask = new KafkaIndexTask(
           taskId,
-          new TaskResource(sequenceName, 1),
+          new TaskResource(group.baseSequenceName, 1),
           spec.getDataSchema(),
           taskTuningConfig,
           kafkaIOConfig,
@@ -1909,7 +1932,10 @@ private boolean isTaskCurrent(int taskGroupId, String taskId)
 
     String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName();
     if (taskGroups.get(taskGroupId) != null) {
-      return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName);
+      return Preconditions
+          .checkNotNull(taskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId)
+          .baseSequenceName
+          .equals(taskSequenceName);
     } else {
       return generateSequenceName(
           ((KafkaIndexTask) taskOptional.get()).getIOConfig()
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 2a852d4a8ad..d89f5e2f026 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -1776,7 +1776,8 @@ private void makeToolboxFactory() throws IOException
           @Override
           public boolean checkPointDataSourceMetadata(
               String supervisorId,
-              int taskGroupId,
+              @Nullable Integer taskGroupId,
+              String baseSequenceName,
               @Nullable DataSourceMetadata previousDataSourceMetadata,
               @Nullable DataSourceMetadata currentDataSourceMetadata
           )
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 2add456df8e..5193b5bbab8 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2103,6 +2103,7 @@ public void testCheckpointForInactiveTaskGroup()
     supervisor.moveTaskGroupToPendingCompletion(0);
     supervisor.checkpoint(
         0,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints))
     );
@@ -2172,6 +2173,7 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException
 
     supervisor.checkpoint(
         0,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap()))
     );
@@ -2190,13 +2192,100 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException
     Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointWithNullTaskGroupId()
+      throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
+  {
+    supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
+    //not adding any events
+    final Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
+    expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    expect(
+        indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)
+    ).anyTimes();
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
+    expect(taskClient.getStatusAsync(anyString()))
+        .andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING))
+        .anyTimes();
+    final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, ImmutableMap.of(0, 0L));
+    expect(taskClient.getCheckpointsAsync(anyString(), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(3);
+    expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes();
+    expect(taskClient.pauseAsync(anyString()))
+        .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L)))
+        .anyTimes();
+    expect(taskClient.setEndOffsetsAsync(anyString(), EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean()))
+        .andReturn(Futures.immediateFuture(true))
+        .anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+
+    supervisor.runInternal();
+
+    final TreeMap<Integer, Map<Integer, Long>> newCheckpoints = new TreeMap<>();
+    newCheckpoints.put(0, ImmutableMap.of(0, 10L));
+    supervisor.checkpoint(
+        null,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, newCheckpoints.get(0)))
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
       for (int i = 0; i < NUM_PARTITIONS; i++) {
         for (int j = 0; j < numEventsPerPartition; j++) {
           kafkaProducer.send(
-              new ProducerRecord<byte[], byte[]>(
+              new ProducerRecord<>(
                   topic,
                   i,
                   null,
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
index 083ef9e5355..bc1b896d429 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
@@ -26,23 +26,29 @@
 import io.druid.indexing.overlord.DataSourceMetadata;
 
 import java.io.IOException;
+import javax.annotation.Nullable;
 
 public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
 {
   private final String supervisorId;
-  private final int taskGroupId;
+  @Nullable
+  private final Integer taskGroupId;
+  @Deprecated
+  private final String baseSequenceName;
   private final DataSourceMetadata previousCheckPoint;
   private final DataSourceMetadata currentCheckPoint;
 
   public CheckPointDataSourceMetadataAction(
       @JsonProperty("supervisorId") String supervisorId,
-      @JsonProperty("taskGroupId") Integer taskGroupId,
+      @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility,
+      @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this
       @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint,
       @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
   )
   {
     this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId");
-    this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
+    this.taskGroupId = taskGroupId;
+    this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName");
     this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint");
     this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint");
   }
@@ -53,8 +59,16 @@ public String getSupervisorId()
     return supervisorId;
   }
 
+  @Deprecated
+  @JsonProperty("sequenceName")
+  public String getBaseSequenceName()
+  {
+    return baseSequenceName;
+  }
+
+  @Nullable
   @JsonProperty
-  public int getTaskGroupId()
+  public Integer getTaskGroupId()
   {
     return taskGroupId;
   }
@@ -87,6 +101,7 @@ public Boolean perform(
     return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
         supervisorId,
         taskGroupId,
+        baseSequenceName,
         previousCheckPoint,
         currentCheckPoint
     );
@@ -103,6 +118,7 @@ public String toString()
   {
     return "CheckPointDataSourceMetadataAction{" +
            "supervisorId='" + supervisorId + '\'' +
+           ", baseSequenceName='" + baseSequenceName + '\'' +
            ", taskGroupId='" + taskGroupId + '\'' +
            ", previousCheckPoint=" + previousCheckPoint +
            ", currentCheckPoint=" + currentCheckPoint +
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
index 31e54cfd521..cf3fe27e9c0 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -159,7 +159,8 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc
 
   public boolean checkPointDataSourceMetadata(
       String supervisorId,
-      int taskGroupId,
+      @Nullable Integer taskGroupId,
+      String baseSequenceName,
       DataSourceMetadata previousDataSourceMetadata,
       DataSourceMetadata currentDataSourceMetadata
   )
@@ -172,7 +173,7 @@ public boolean checkPointDataSourceMetadata(
 
       Preconditions.checkNotNull(supervisor, "supervisor could not be found");
 
-      supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata);
+      supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata, currentDataSourceMetadata);
       return true;
     }
     catch (Exception e) {
diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 0ba0701e82d..8f8105ea8ab 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -83,7 +83,8 @@ public void reset(DataSourceMetadata dataSourceMetadata) {}
 
       @Override
       public void checkpoint(
-          int taskGroupId,
+          @Nullable Integer taskGroupId,
+          String baseSequenceName,
           DataSourceMetadata previousCheckPoint,
           DataSourceMetadata currentCheckPoint
       )
diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
index 58e73ff3df7..3f90766e3cd 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,6 +21,8 @@
 
 import io.druid.indexing.overlord.DataSourceMetadata;
 
+import javax.annotation.Nullable;
+
 public interface Supervisor
 {
   void start();
@@ -44,8 +46,14 @@
    * represented by {@param currentCheckpoint} DataSourceMetadata
    *
    * @param taskGroupId        unique Identifier to figure out for which sequence to do checkpointing
+   * @param baseSequenceName   baseSequenceName
    * @param previousCheckPoint DataSourceMetadata checkpointed in previous call
    * @param currentCheckPoint  current DataSourceMetadata to be checkpointed
    */
-  void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint);
+  void checkpoint(
+      @Nullable Integer taskGroupId,
+      @Deprecated String baseSequenceName,
+      DataSourceMetadata previousCheckPoint,
+      DataSourceMetadata currentCheckPoint
+  );
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org