You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2018/08/18 04:43:46 UTC
[incubator-druid] branch 0.12.3 updated: [Backport] Fix NPE for
taskGroupId when rolling update (#6168) (#6188)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch 0.12.3
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.3 by this push:
new bf1d5d7 [Backport] Fix NPE for taskGroupId when rolling update (#6168) (#6188)
bf1d5d7 is described below
commit bf1d5d7b99d3147f42a9dc91ea351aa300803dc9
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Fri Aug 17 21:43:43 2018 -0700
[Backport] Fix NPE for taskGroupId when rolling update (#6168) (#6188)
---
.../io/druid/indexing/kafka/KafkaIndexTask.java | 1 +
.../indexing/kafka/supervisor/KafkaSupervisor.java | 110 +++++++++++++--------
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 3 +-
.../kafka/supervisor/KafkaSupervisorTest.java | 91 ++++++++++++++++-
.../CheckPointDataSourceMetadataAction.java | 24 ++++-
.../overlord/supervisor/SupervisorManager.java | 5 +-
.../overlord/supervisor/NoopSupervisorSpec.java | 3 +-
.../indexing/overlord/supervisor/Supervisor.java | 10 +-
8 files changed, 195 insertions(+), 52 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 258e203..d0590dc 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -705,6 +705,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction(
getDataSource(),
ioConfig.getTaskGroupId(),
+ getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets))
);
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index cb3352d..6386823 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -92,6 +92,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
@@ -143,7 +144,7 @@ public class KafkaSupervisor implements Supervisor
* time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups]
* map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]).
*/
- private static class TaskGroup
+ private class TaskGroup
{
// This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
@@ -157,6 +158,7 @@ public class KafkaSupervisor implements Supervisor
final Optional<DateTime> maximumMessageTime;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap<>();
+ final String baseSequenceName;
TaskGroup(
ImmutableMap<Integer, Long> partitionOffsets,
@@ -168,6 +170,7 @@ public class KafkaSupervisor implements Supervisor
this.minimumMessageTime = minimumMessageTime;
this.maximumMessageTime = maximumMessageTime;
this.sequenceOffsets.put(0, partitionOffsets);
+ this.baseSequenceName = generateSequenceName(partitionOffsets, minimumMessageTime, maximumMessageTime);
}
int addNewCheckpoint(Map<Integer, Long> checkpoint)
@@ -490,25 +493,29 @@ public class KafkaSupervisor implements Supervisor
}
@Override
- public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint)
+ public void checkpoint(
+ @Nullable Integer taskGroupId,
+ @Deprecated String baseSequenceName,
+ DataSourceMetadata previousCheckPoint,
+ DataSourceMetadata currentCheckPoint
+ )
{
- Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
- Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null");
+ Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint");
+ Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot be null");
Preconditions.checkArgument(
- ioConfig.getTopic()
- .equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions()
- .getTopic()),
+ ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()),
"Supervisor topic [%s] and topic in checkpoint [%s] does not match",
ioConfig.getTopic(),
- ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()
+ ((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()
);
- log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId);
+ log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId);
notices.add(
new CheckpointNotice(
taskGroupId,
- (KafkaDataSourceMetadata) previousCheckpoint,
- (KafkaDataSourceMetadata) currentCheckpoint
+ baseSequenceName,
+ (KafkaDataSourceMetadata) previousCheckPoint,
+ (KafkaDataSourceMetadata) currentCheckPoint
)
);
}
@@ -612,17 +619,20 @@ public class KafkaSupervisor implements Supervisor
private class CheckpointNotice implements Notice
{
- final int taskGroupId;
- final KafkaDataSourceMetadata previousCheckpoint;
- final KafkaDataSourceMetadata currentCheckpoint;
+ @Nullable private final Integer nullableTaskGroupId;
+ @Deprecated private final String baseSequenceName;
+ private final KafkaDataSourceMetadata previousCheckpoint;
+ private final KafkaDataSourceMetadata currentCheckpoint;
CheckpointNotice(
- int taskGroupId,
+ @Nullable Integer nullableTaskGroupId,
+ @Deprecated String baseSequenceName,
KafkaDataSourceMetadata previousCheckpoint,
KafkaDataSourceMetadata currentCheckpoint
)
{
- this.taskGroupId = taskGroupId;
+ this.baseSequenceName = baseSequenceName;
+ this.nullableTaskGroupId = nullableTaskGroupId;
this.previousCheckpoint = previousCheckpoint;
this.currentCheckpoint = currentCheckpoint;
}
@@ -630,12 +640,44 @@ public class KafkaSupervisor implements Supervisor
@Override
public void handle() throws ExecutionException, InterruptedException, TimeoutException
{
+ // Find taskGroupId using taskId if it's null. It can be null while rolling update.
+ final int taskGroupId;
+ if (nullableTaskGroupId == null) {
+ // We search taskId in taskGroups and pendingCompletionTaskGroups sequentially. This should be fine because
+ // 1) a taskGroup can be moved from taskGroups to pendingCompletionTaskGroups in RunNotice
+ // (see checkTaskDuration()).
+ // 2) Notices are proceesed by a single thread. So, CheckpointNotice and RunNotice cannot be processed at the
+ // same time.
+ final java.util.Optional<Integer> maybeGroupId = taskGroups
+ .entrySet()
+ .stream()
+ .filter(entry -> {
+ final TaskGroup taskGroup = entry.getValue();
+ return taskGroup.baseSequenceName.equals(baseSequenceName);
+ })
+ .findAny()
+ .map(Entry::getKey);
+ taskGroupId = maybeGroupId.orElse(
+ pendingCompletionTaskGroups
+ .entrySet()
+ .stream()
+ .filter(entry -> {
+ final List<TaskGroup> taskGroups = entry.getValue();
+ return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
+ })
+ .findAny()
+ .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
+ .getKey()
+ );
+ } else {
+ taskGroupId = nullableTaskGroupId;
+ }
+
// check for consistency
// if already received request for this sequenceName and dataSourceMetadata combination then return
-
final TaskGroup taskGroup = taskGroups.get(taskGroupId);
- if (isValidTaskGroup(taskGroup)) {
+ if (isValidTaskGroup(taskGroupId, taskGroup)) {
final TreeMap<Integer, Map<Integer, Long>> checkpoints = taskGroup.sequenceOffsets;
// check validity of previousCheckpoint
@@ -655,20 +697,13 @@ public class KafkaSupervisor implements Supervisor
log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
return;
}
- final int taskGroupId = getTaskGroupIdForPartition(
- currentCheckpoint.getKafkaPartitions()
- .getPartitionOffsetMap()
- .keySet()
- .iterator()
- .next()
- );
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
}
}
- private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
+ private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup)
{
if (taskGroup == null) {
// taskGroup might be in pendingCompletionTaskGroups or partitionGroups
@@ -867,17 +902,6 @@ public class KafkaSupervisor implements Supervisor
return Joiner.on("_").join("index_kafka", dataSource, hashCode);
}
- @VisibleForTesting
- String generateSequenceName(TaskGroup taskGroup)
- {
- Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
- return generateSequenceName(
- taskGroup.partitionOffsets,
- taskGroup.minimumMessageTime,
- taskGroup.maximumMessageTime
- );
- }
-
private static String getRandomId()
{
final StringBuilder suffix = new StringBuilder(8);
@@ -1748,7 +1772,6 @@ public class KafkaSupervisor implements Supervisor
endPartitions.put(partition, Long.MAX_VALUE);
}
TaskGroup group = taskGroups.get(groupId);
- String sequenceName = generateSequenceName(group);
Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull();
@@ -1756,7 +1779,7 @@ public class KafkaSupervisor implements Supervisor
KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
groupId,
- sequenceName,
+ group.baseSequenceName,
new KafkaPartitions(ioConfig.getTopic(), startPartitions),
new KafkaPartitions(ioConfig.getTopic(), endPartitions),
consumerProperties,
@@ -1777,10 +1800,10 @@ public class KafkaSupervisor implements Supervisor
.putAll(spec.getContext())
.build();
for (int i = 0; i < replicas; i++) {
- String taskId = Joiner.on("_").join(sequenceName, getRandomId());
+ String taskId = Joiner.on("_").join(group.baseSequenceName, getRandomId());
KafkaIndexTask indexTask = new KafkaIndexTask(
taskId,
- new TaskResource(sequenceName, 1),
+ new TaskResource(group.baseSequenceName, 1),
spec.getDataSchema(),
taskTuningConfig,
kafkaIOConfig,
@@ -1909,7 +1932,10 @@ public class KafkaSupervisor implements Supervisor
String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName();
if (taskGroups.get(taskGroupId) != null) {
- return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName);
+ return Preconditions
+ .checkNotNull(taskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId)
+ .baseSequenceName
+ .equals(taskSequenceName);
} else {
return generateSequenceName(
((KafkaIndexTask) taskOptional.get()).getIOConfig()
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 2a852d4..d89f5e2 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -1776,7 +1776,8 @@ public class KafkaIndexTaskTest
@Override
public boolean checkPointDataSourceMetadata(
String supervisorId,
- int taskGroupId,
+ @Nullable Integer taskGroupId,
+ String baseSequenceName,
@Nullable DataSourceMetadata previousDataSourceMetadata,
@Nullable DataSourceMetadata currentDataSourceMetadata
)
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 2add456..5193b5b 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2103,6 +2103,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint(
0,
+ ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints))
);
@@ -2172,6 +2173,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.checkpoint(
0,
+ ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap()))
);
@@ -2190,13 +2192,100 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
}
+ @Test(timeout = 60_000L)
+ public void testCheckpointWithNullTaskGroupId()
+ throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
+ {
+ supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
+ //not adding any events
+ final Task id1 = createKafkaIndexTask(
+ "id1",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ final Task id2 = createKafkaIndexTask(
+ "id2",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ final Task id3 = createKafkaIndexTask(
+ "id3",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
+ expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+ expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+ expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+ expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+ expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+ expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+ expect(
+ indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)
+ ).anyTimes();
+ taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
+ expect(taskClient.getStatusAsync(anyString()))
+ .andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING))
+ .anyTimes();
+ final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+ checkpoints.put(0, ImmutableMap.of(0, 0L));
+ expect(taskClient.getCheckpointsAsync(anyString(), anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(3);
+ expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes();
+ expect(taskClient.pauseAsync(anyString()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L)))
+ .anyTimes();
+ expect(taskClient.setEndOffsetsAsync(anyString(), EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean()))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+
+ replayAll();
+
+ supervisor.start();
+
+ supervisor.runInternal();
+
+ final TreeMap<Integer, Map<Integer, Long>> newCheckpoints = new TreeMap<>();
+ newCheckpoints.put(0, ImmutableMap.of(0, 10L));
+ supervisor.checkpoint(
+ null,
+ ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic, newCheckpoints.get(0)))
+ );
+
+ while (supervisor.getNoticesQueueSize() > 0) {
+ Thread.sleep(100);
+ }
+
+ verifyAll();
+ }
+
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (int i = 0; i < NUM_PARTITIONS; i++) {
for (int j = 0; j < numEventsPerPartition; j++) {
kafkaProducer.send(
- new ProducerRecord<byte[], byte[]>(
+ new ProducerRecord<>(
topic,
i,
null,
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
index 083ef9e..bc1b896 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
@@ -26,23 +26,29 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.DataSourceMetadata;
import java.io.IOException;
+import javax.annotation.Nullable;
public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{
private final String supervisorId;
- private final int taskGroupId;
+ @Nullable
+ private final Integer taskGroupId;
+ @Deprecated
+ private final String baseSequenceName;
private final DataSourceMetadata previousCheckPoint;
private final DataSourceMetadata currentCheckPoint;
public CheckPointDataSourceMetadataAction(
@JsonProperty("supervisorId") String supervisorId,
- @JsonProperty("taskGroupId") Integer taskGroupId,
+ @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility,
+ @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this
@JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint,
@JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
)
{
this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId");
- this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
+ this.taskGroupId = taskGroupId;
+ this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName");
this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint");
this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint");
}
@@ -53,8 +59,16 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
return supervisorId;
}
+ @Deprecated
+ @JsonProperty("sequenceName")
+ public String getBaseSequenceName()
+ {
+ return baseSequenceName;
+ }
+
+ @Nullable
@JsonProperty
- public int getTaskGroupId()
+ public Integer getTaskGroupId()
{
return taskGroupId;
}
@@ -87,6 +101,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
supervisorId,
taskGroupId,
+ baseSequenceName,
previousCheckPoint,
currentCheckPoint
);
@@ -103,6 +118,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{
return "CheckPointDataSourceMetadataAction{" +
"supervisorId='" + supervisorId + '\'' +
+ ", baseSequenceName='" + baseSequenceName + '\'' +
", taskGroupId='" + taskGroupId + '\'' +
", previousCheckPoint=" + previousCheckPoint +
", currentCheckPoint=" + currentCheckPoint +
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
index 31e54cf..cf3fe27 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -159,7 +159,8 @@ public class SupervisorManager
public boolean checkPointDataSourceMetadata(
String supervisorId,
- int taskGroupId,
+ @Nullable Integer taskGroupId,
+ String baseSequenceName,
DataSourceMetadata previousDataSourceMetadata,
DataSourceMetadata currentDataSourceMetadata
)
@@ -172,7 +173,7 @@ public class SupervisorManager
Preconditions.checkNotNull(supervisor, "supervisor could not be found");
- supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata);
+ supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata, currentDataSourceMetadata);
return true;
}
catch (Exception e) {
diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 0ba0701..8f8105e 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -83,7 +83,8 @@ public class NoopSupervisorSpec implements SupervisorSpec
@Override
public void checkpoint(
- int taskGroupId,
+ @Nullable Integer taskGroupId,
+ String baseSequenceName,
DataSourceMetadata previousCheckPoint,
DataSourceMetadata currentCheckPoint
)
diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
index 58e73ff..3f90766 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,6 +21,8 @@ package io.druid.indexing.overlord.supervisor;
import io.druid.indexing.overlord.DataSourceMetadata;
+import javax.annotation.Nullable;
+
public interface Supervisor
{
void start();
@@ -44,8 +46,14 @@ public interface Supervisor
* represented by {@param currentCheckpoint} DataSourceMetadata
*
* @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing
+ * @param baseSequenceName baseSequenceName
* @param previousCheckPoint DataSourceMetadata checkpointed in previous call
* @param currentCheckPoint current DataSourceMetadata to be checkpointed
*/
- void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint);
+ void checkpoint(
+ @Nullable Integer taskGroupId,
+ @Deprecated String baseSequenceName,
+ DataSourceMetadata previousCheckPoint,
+ DataSourceMetadata currentCheckPoint
+ );
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org