You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/08/27 15:02:06 UTC
[incubator-druid] branch 0.12.3 updated: Fix NPE in
KafkaSupervisor.checkpointTaskGroup (#6206) (#6245)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch 0.12.3
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.3 by this push:
new e8966a7 Fix NPE in KafkaSupervisor.checkpointTaskGroup (#6206) (#6245)
e8966a7 is described below
commit e8966a7224076b63aaa12c6e258e299838d8502f
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Mon Aug 27 08:01:59 2018 -0700
Fix NPE in KafkaSupervisor.checkpointTaskGroup (#6206) (#6245)
* Fix NPE in KafkaSupervisor.checkpointTaskGroup
* address comments
* address comment
---
.../indexing/kafka/supervisor/KafkaSupervisor.java | 154 +++++++++++++--------
.../indexing/kafka/supervisor/TaskReportData.java | 2 +-
2 files changed, 100 insertions(+), 56 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 6386823..431fe0f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -146,6 +146,8 @@ public class KafkaSupervisor implements Supervisor
*/
private class TaskGroup
{
+ final int groupId;
+
// This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
// this task group has completed successfully, at which point this will be destroyed and a new task group will be
@@ -161,11 +163,13 @@ public class KafkaSupervisor implements Supervisor
final String baseSequenceName;
TaskGroup(
+ int groupId,
ImmutableMap<Integer, Long> partitionOffsets,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
)
{
+ this.groupId = groupId;
this.partitionOffsets = partitionOffsets;
this.minimumMessageTime = minimumMessageTime;
this.maximumMessageTime = maximumMessageTime;
@@ -187,9 +191,21 @@ public class KafkaSupervisor implements Supervisor
private static class TaskData
{
+ @Nullable
volatile TaskStatus status;
+ @Nullable
volatile DateTime startTime;
volatile Map<Integer, Long> currentOffsets = new HashMap<>();
+
+ @Override
+ public String toString()
+ {
+ return "TaskData{" +
+ "status=" + status +
+ ", startTime=" + startTime +
+ ", currentOffsets=" + currentOffsets +
+ '}';
+ }
}
// Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class
@@ -697,8 +713,8 @@ public class KafkaSupervisor implements Supervisor
log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
return;
}
- final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
- taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
+ final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroup, false).get();
+ taskGroup.addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
}
}
@@ -764,10 +780,13 @@ public class KafkaSupervisor implements Supervisor
: currentMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.get(resetPartitionOffset.getKey());
- final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
- if (partitionOffsetInMetadataStore != null ||
- (partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
- .equals(resetPartitionOffset.getValue()))) {
+ final TaskGroup partitionTaskGroup = taskGroups.get(
+ getTaskGroupIdForPartition(resetPartitionOffset.getKey())
+ );
+ final boolean isSameOffset = partitionTaskGroup != null
+ && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
+ .equals(resetPartitionOffset.getValue());
+ if (partitionOffsetInMetadataStore != null || isSameOffset) {
doReset = true;
break;
}
@@ -991,7 +1010,7 @@ public class KafkaSupervisor implements Supervisor
List<String> futureTaskIds = Lists.newArrayList();
List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
List<Task> tasks = taskStorage.getActiveTasks();
- final Set<Integer> taskGroupsToVerify = new HashSet<>();
+ final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>();
for (Task task : tasks) {
if (!(task instanceof KafkaIndexTask) || !dataSource.equals(task.getDataSource())) {
@@ -1098,6 +1117,7 @@ public class KafkaSupervisor implements Supervisor
k -> {
log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
return new TaskGroup(
+ taskGroupId,
ImmutableMap.copyOf(
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
),
@@ -1106,8 +1126,15 @@ public class KafkaSupervisor implements Supervisor
);
}
);
- taskGroupsToVerify.add(taskGroupId);
- taskGroup.tasks.putIfAbsent(taskId, new TaskData());
+ taskGroupsToVerify.put(taskGroupId, taskGroup);
+ final TaskData prevTaskGroup = taskGroup.tasks.putIfAbsent(taskId, new TaskData());
+ if (prevTaskGroup != null) {
+ throw new ISE(
+ "WTH? a taskGroup[%s] already exists for new task[%s]",
+ prevTaskGroup,
+ taskId
+ );
+ }
}
}
return true;
@@ -1135,7 +1162,7 @@ public class KafkaSupervisor implements Supervisor
log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource);
// make sure the checkpoints are consistent with each other and with the metadata store
- taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints);
+ taskGroupsToVerify.values().forEach(this::verifyAndMergeCheckpoints);
}
/**
@@ -1145,10 +1172,9 @@ public class KafkaSupervisor implements Supervisor
* 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly
* created tasks for the taskGroup start indexing from after the latest published offsets.
*/
- private void verifyAndMergeCheckpoints(final Integer groupId)
+ private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
{
- final TaskGroup taskGroup = taskGroups.get(groupId);
-
+ final int groupId = taskGroup.groupId;
// List<TaskId, Map -> {SequenceId, Checkpoints}>
final List<Pair<String, TreeMap<Integer, Map<Integer, Long>>>> taskSequences = new CopyOnWriteArrayList<>();
final List<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures = new ArrayList<>();
@@ -1302,6 +1328,7 @@ public class KafkaSupervisor implements Supervisor
// reading the minimumMessageTime & maximumMessageTime from the publishing task and setting it here is not necessary as this task cannot
// change to a state where it will read any more events
TaskGroup newTaskGroup = new TaskGroup(
+ groupId,
ImmutableMap.copyOf(startingPartitions),
Optional.<DateTime>absent(),
Optional.<DateTime>absent()
@@ -1339,8 +1366,8 @@ public class KafkaSupervisor implements Supervisor
}
taskData.startTime = startTime;
- long millisRemaining = ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis()
- - taskData.startTime.getMillis());
+ long millisRemaining = ioConfig.getTaskDuration().getMillis() -
+ (System.currentTimeMillis() - taskData.startTime.getMillis());
if (millisRemaining > 0) {
scheduledExec.schedule(
buildRunTask(),
@@ -1393,7 +1420,8 @@ public class KafkaSupervisor implements Supervisor
// find the longest running task from this group
DateTime earliestTaskStart = DateTimes.nowUtc();
for (TaskData taskData : group.tasks.values()) {
- if (earliestTaskStart.isAfter(taskData.startTime)) {
+ // startTime can be null if kafkaSupervisor is stopped gracefully before processing any runNotice
+ if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
earliestTaskStart = taskData.startTime;
}
}
@@ -1402,7 +1430,7 @@ public class KafkaSupervisor implements Supervisor
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
- futures.add(checkpointTaskGroup(groupId, true));
+ futures.add(checkpointTaskGroup(group, true));
}
}
@@ -1440,10 +1468,8 @@ public class KafkaSupervisor implements Supervisor
}
}
- private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int groupId, final boolean finalize)
+ private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final TaskGroup taskGroup, final boolean finalize)
{
- final TaskGroup taskGroup = taskGroups.get(groupId);
-
if (finalize) {
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks
Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
@@ -1452,30 +1478,33 @@ public class KafkaSupervisor implements Supervisor
String taskId = taskEntry.getKey();
TaskData task = taskEntry.getValue();
- if (task.status.isSuccess()) {
- // If any task in this group has already completed, stop the rest of the tasks in the group and return.
- // This will cause us to create a new set of tasks next cycle that will start from the offsets in
- // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
- // failed and we need to re-ingest)
- return Futures.transform(
- stopTasksInGroup(taskGroup),
- new Function<Object, Map<Integer, Long>>()
- {
- @Nullable
- @Override
- public Map<Integer, Long> apply(@Nullable Object input)
+ // task.status can be null if kafkaSupervisor is stopped gracefully before processing any runNotice.
+ if (task.status != null) {
+ if (task.status.isSuccess()) {
+ // If any task in this group has already completed, stop the rest of the tasks in the group and return.
+ // This will cause us to create a new set of tasks next cycle that will start from the offsets in
+ // metadata store (which will have advanced if we succeeded in publishing and will remain the same if
+ // publishing failed and we need to re-ingest)
+ return Futures.transform(
+ stopTasksInGroup(taskGroup),
+ new Function<Object, Map<Integer, Long>>()
{
- return null;
+ @Nullable
+ @Override
+ public Map<Integer, Long> apply(@Nullable Object input)
+ {
+ return null;
+ }
}
- }
- );
- }
+ );
+ }
- if (task.status.isRunnable()) {
- if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
- log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
- killTask(taskId);
- i.remove();
+ if (task.status.isRunnable()) {
+ if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
+ log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
+ killTask(taskId);
+ i.remove();
+ }
}
}
}
@@ -1522,7 +1551,7 @@ public class KafkaSupervisor implements Supervisor
final List<String> setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds());
if (setEndOffsetTaskIds.isEmpty()) {
- log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
+ log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
}
@@ -1533,11 +1562,15 @@ public class KafkaSupervisor implements Supervisor
"Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]",
endOffsets,
taskGroup.sequenceOffsets.lastEntry().getValue(),
- groupId
+ taskGroup.groupId
);
}
- log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
+ log.info(
+ "Setting endOffsets for tasks in taskGroup [%d] to %s and resuming",
+ taskGroup.groupId,
+ endOffsets
+ );
for (final String taskId : setEndOffsetTaskIds) {
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
}
@@ -1559,7 +1592,7 @@ public class KafkaSupervisor implements Supervisor
}
if (taskGroup.tasks.isEmpty()) {
- log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
+ log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
}
@@ -1599,11 +1632,15 @@ public class KafkaSupervisor implements Supervisor
continue;
}
- Iterator<Map.Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
+ Iterator<Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
while (iTask.hasNext()) {
- Map.Entry<String, TaskData> task = iTask.next();
+ final Entry<String, TaskData> entry = iTask.next();
+ final String taskId = entry.getKey();
+ final TaskData taskData = entry.getValue();
+
+ Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);
- if (task.getValue().status.isFailure()) {
+ if (taskData.status.isFailure()) {
iTask.remove(); // remove failed task
if (group.tasks.isEmpty()) {
// if all tasks in the group have failed, just nuke all task groups with this partition set and restart
@@ -1612,10 +1649,10 @@ public class KafkaSupervisor implements Supervisor
}
}
- if (task.getValue().status.isSuccess()) {
+ if (taskData.status.isSuccess()) {
// If one of the pending completion tasks was successful, stop the rest of the tasks in the group as
// we no longer need them to publish their segment.
- log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds());
+ log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds());
futures.add(stopTasksInGroup(group));
foundSuccess = true;
toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups
@@ -1686,6 +1723,8 @@ public class KafkaSupervisor implements Supervisor
continue;
}
+ Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);
+
// remove failed tasks
if (taskData.status.isFailure()) {
iTasks.remove();
@@ -1713,7 +1752,7 @@ public class KafkaSupervisor implements Supervisor
taskGroups.entrySet()
.stream()
.filter(taskGroup -> taskGroup.getValue().tasks.size() < ioConfig.getReplicas())
- .forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getKey()));
+ .forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getValue()));
// check that there is a current task group for each group of partitions in [partitionGroups]
for (Integer groupId : partitionGroups.keySet()) {
@@ -1729,6 +1768,7 @@ public class KafkaSupervisor implements Supervisor
) : Optional.<DateTime>absent());
final TaskGroup taskGroup = new TaskGroup(
+ groupId,
generateStartingOffsetsForPartitionGroup(groupId),
minimumMessageTime,
maximumMessageTime
@@ -1955,8 +1995,12 @@ public class KafkaSupervisor implements Supervisor
final List<ListenableFuture<Void>> futures = Lists.newArrayList();
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
- if (!entry.getValue().status.isComplete()) {
- futures.add(stopTask(entry.getKey(), false));
+ final String taskId = entry.getKey();
+ final TaskData taskData = entry.getValue();
+ if (taskData.status == null) {
+ killTask(taskId);
+ } else if (!taskData.status.isComplete()) {
+ futures.add(stopTask(taskId, false));
}
}
@@ -2033,7 +2077,7 @@ public class KafkaSupervisor implements Supervisor
for (TaskGroup taskGroup : taskGroups.values()) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
- DateTime startTime = entry.getValue().startTime;
+ @Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Long remainingSeconds = null;
if (startTime != null) {
@@ -2060,7 +2104,7 @@ public class KafkaSupervisor implements Supervisor
for (TaskGroup taskGroup : taskGroups) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
- DateTime startTime = entry.getValue().startTime;
+ @Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Long remainingSeconds = null;
if (taskGroup.completionTimeout != null) {
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java
index 923f69e..bc127e1 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java
@@ -45,7 +45,7 @@ public class TaskReportData
String id,
@Nullable Map<Integer, Long> startingOffsets,
@Nullable Map<Integer, Long> currentOffsets,
- DateTime startTime,
+ @Nullable DateTime startTime,
Long remainingSeconds,
TaskType type,
@Nullable Map<Integer, Long> lag
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org