You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/08/27 15:02:06 UTC

[incubator-druid] branch 0.12.3 updated: Fix NPE in KafkaSupervisor.checkpointTaskGroup (#6206) (#6245)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.12.3
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.3 by this push:
     new e8966a7  Fix NPE in KafkaSupervisor.checkpointTaskGroup (#6206) (#6245)
e8966a7 is described below

commit e8966a7224076b63aaa12c6e258e299838d8502f
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Mon Aug 27 08:01:59 2018 -0700

    Fix NPE in KafkaSupervisor.checkpointTaskGroup (#6206) (#6245)
    
    * Fix NPE in KafkaSupervisor.checkpointTaskGroup
    
    * address comments
    
    * address comment
---
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 154 +++++++++++++--------
 .../indexing/kafka/supervisor/TaskReportData.java  |   2 +-
 2 files changed, 100 insertions(+), 56 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 6386823..431fe0f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -146,6 +146,8 @@ public class KafkaSupervisor implements Supervisor
    */
   private class TaskGroup
   {
+    final int groupId;
+
     // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
     // this task group has completed successfully, at which point this will be destroyed and a new task group will be
@@ -161,11 +163,13 @@ public class KafkaSupervisor implements Supervisor
     final String baseSequenceName;
 
     TaskGroup(
+        int groupId,
         ImmutableMap<Integer, Long> partitionOffsets,
         Optional<DateTime> minimumMessageTime,
         Optional<DateTime> maximumMessageTime
     )
     {
+      this.groupId = groupId;
       this.partitionOffsets = partitionOffsets;
       this.minimumMessageTime = minimumMessageTime;
       this.maximumMessageTime = maximumMessageTime;
@@ -187,9 +191,21 @@ public class KafkaSupervisor implements Supervisor
 
   private static class TaskData
   {
+    @Nullable
     volatile TaskStatus status;
+    @Nullable
     volatile DateTime startTime;
     volatile Map<Integer, Long> currentOffsets = new HashMap<>();
+
+    @Override
+    public String toString()
+    {
+      return "TaskData{" +
+             "status=" + status +
+             ", startTime=" + startTime +
+             ", currentOffsets=" + currentOffsets +
+             '}';
+    }
   }
 
   // Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class
@@ -697,8 +713,8 @@ public class KafkaSupervisor implements Supervisor
           log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
           return;
         }
-        final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
-        taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
+        final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroup, false).get();
+        taskGroup.addNewCheckpoint(newCheckpoint);
         log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
       }
     }
@@ -764,10 +780,13 @@ public class KafkaSupervisor implements Supervisor
                                                       : currentMetadata.getKafkaPartitions()
                                                                        .getPartitionOffsetMap()
                                                                        .get(resetPartitionOffset.getKey());
-          final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
-          if (partitionOffsetInMetadataStore != null ||
-              (partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
-                                                                                .equals(resetPartitionOffset.getValue()))) {
+          final TaskGroup partitionTaskGroup = taskGroups.get(
+              getTaskGroupIdForPartition(resetPartitionOffset.getKey())
+          );
+          final boolean isSameOffset = partitionTaskGroup != null
+                                       && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
+                                                                             .equals(resetPartitionOffset.getValue());
+          if (partitionOffsetInMetadataStore != null || isSameOffset) {
             doReset = true;
             break;
           }
@@ -991,7 +1010,7 @@ public class KafkaSupervisor implements Supervisor
     List<String> futureTaskIds = Lists.newArrayList();
     List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
     List<Task> tasks = taskStorage.getActiveTasks();
-    final Set<Integer> taskGroupsToVerify = new HashSet<>();
+    final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>();
 
     for (Task task : tasks) {
       if (!(task instanceof KafkaIndexTask) || !dataSource.equals(task.getDataSource())) {
@@ -1098,6 +1117,7 @@ public class KafkaSupervisor implements Supervisor
                                 k -> {
                                   log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
                                   return new TaskGroup(
+                                      taskGroupId,
                                       ImmutableMap.copyOf(
                                           kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
                                       ),
@@ -1106,8 +1126,15 @@ public class KafkaSupervisor implements Supervisor
                                   );
                                 }
                             );
-                            taskGroupsToVerify.add(taskGroupId);
-                            taskGroup.tasks.putIfAbsent(taskId, new TaskData());
+                            taskGroupsToVerify.put(taskGroupId, taskGroup);
+                            final TaskData prevTaskGroup = taskGroup.tasks.putIfAbsent(taskId, new TaskData());
+                            if (prevTaskGroup != null) {
+                              throw new ISE(
+                                  "WTH? a taskGroup[%s] already exists for new task[%s]",
+                                  prevTaskGroup,
+                                  taskId
+                              );
+                            }
                           }
                         }
                         return true;
@@ -1135,7 +1162,7 @@ public class KafkaSupervisor implements Supervisor
     log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource);
 
     // make sure the checkpoints are consistent with each other and with the metadata store
-    taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints);
+    taskGroupsToVerify.values().forEach(this::verifyAndMergeCheckpoints);
   }
 
   /**
@@ -1145,10 +1172,9 @@ public class KafkaSupervisor implements Supervisor
    * 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly
    * created tasks for the taskGroup start indexing from after the latest published offsets.
    */
-  private void verifyAndMergeCheckpoints(final Integer groupId)
+  private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
   {
-    final TaskGroup taskGroup = taskGroups.get(groupId);
-
+    final int groupId = taskGroup.groupId;
     // List<TaskId, Map -> {SequenceId, Checkpoints}>
     final List<Pair<String, TreeMap<Integer, Map<Integer, Long>>>> taskSequences = new CopyOnWriteArrayList<>();
     final List<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures = new ArrayList<>();
@@ -1302,6 +1328,7 @@ public class KafkaSupervisor implements Supervisor
     // reading the minimumMessageTime & maximumMessageTime from the publishing task and setting it here is not necessary as this task cannot
     // change to a state where it will read any more events
     TaskGroup newTaskGroup = new TaskGroup(
+        groupId,
         ImmutableMap.copyOf(startingPartitions),
         Optional.<DateTime>absent(),
         Optional.<DateTime>absent()
@@ -1339,8 +1366,8 @@ public class KafkaSupervisor implements Supervisor
                       }
 
                       taskData.startTime = startTime;
-                      long millisRemaining = ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis()
-                                                                                       - taskData.startTime.getMillis());
+                      long millisRemaining = ioConfig.getTaskDuration().getMillis() -
+                                             (System.currentTimeMillis() - taskData.startTime.getMillis());
                       if (millisRemaining > 0) {
                         scheduledExec.schedule(
                             buildRunTask(),
@@ -1393,7 +1420,8 @@ public class KafkaSupervisor implements Supervisor
       // find the longest running task from this group
       DateTime earliestTaskStart = DateTimes.nowUtc();
       for (TaskData taskData : group.tasks.values()) {
-        if (earliestTaskStart.isAfter(taskData.startTime)) {
+        // startTime can be null if kafkaSupervisor is stopped gracefully before processing any runNotice
+        if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
           earliestTaskStart = taskData.startTime;
         }
       }
@@ -1402,7 +1430,7 @@ public class KafkaSupervisor implements Supervisor
       if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
         log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
         futureGroupIds.add(groupId);
-        futures.add(checkpointTaskGroup(groupId, true));
+        futures.add(checkpointTaskGroup(group, true));
       }
     }
 
@@ -1440,10 +1468,8 @@ public class KafkaSupervisor implements Supervisor
     }
   }
 
-  private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int groupId, final boolean finalize)
+  private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final TaskGroup taskGroup, final boolean finalize)
   {
-    final TaskGroup taskGroup = taskGroups.get(groupId);
-
     if (finalize) {
       // 1) Check if any task completed (in which case we're done) and kill unassigned tasks
       Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
@@ -1452,30 +1478,33 @@ public class KafkaSupervisor implements Supervisor
         String taskId = taskEntry.getKey();
         TaskData task = taskEntry.getValue();
 
-        if (task.status.isSuccess()) {
-          // If any task in this group has already completed, stop the rest of the tasks in the group and return.
-          // This will cause us to create a new set of tasks next cycle that will start from the offsets in
-          // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
-          // failed and we need to re-ingest)
-          return Futures.transform(
-              stopTasksInGroup(taskGroup),
-              new Function<Object, Map<Integer, Long>>()
-              {
-                @Nullable
-                @Override
-                public Map<Integer, Long> apply(@Nullable Object input)
+        // task.status can be null if kafkaSupervisor is stopped gracefully before processing any runNotice.
+        if (task.status != null) {
+          if (task.status.isSuccess()) {
+            // If any task in this group has already completed, stop the rest of the tasks in the group and return.
+            // This will cause us to create a new set of tasks next cycle that will start from the offsets in
+            // metadata store (which will have advanced if we succeeded in publishing and will remain the same if
+            // publishing failed and we need to re-ingest)
+            return Futures.transform(
+                stopTasksInGroup(taskGroup),
+                new Function<Object, Map<Integer, Long>>()
                 {
-                  return null;
+                  @Nullable
+                  @Override
+                  public Map<Integer, Long> apply(@Nullable Object input)
+                  {
+                    return null;
+                  }
                 }
-              }
-          );
-        }
+            );
+          }
 
-        if (task.status.isRunnable()) {
-          if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
-            log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
-            killTask(taskId);
-            i.remove();
+          if (task.status.isRunnable()) {
+            if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
+              log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
+              killTask(taskId);
+              i.remove();
+            }
           }
         }
       }
@@ -1522,7 +1551,7 @@ public class KafkaSupervisor implements Supervisor
             final List<String> setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds());
 
             if (setEndOffsetTaskIds.isEmpty()) {
-              log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
+              log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
               return null;
             }
 
@@ -1533,11 +1562,15 @@ public class KafkaSupervisor implements Supervisor
                     "Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]",
                     endOffsets,
                     taskGroup.sequenceOffsets.lastEntry().getValue(),
-                    groupId
+                    taskGroup.groupId
                 );
               }
 
-              log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
+              log.info(
+                  "Setting endOffsets for tasks in taskGroup [%d] to %s and resuming",
+                  taskGroup.groupId,
+                  endOffsets
+              );
               for (final String taskId : setEndOffsetTaskIds) {
                 setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
               }
@@ -1559,7 +1592,7 @@ public class KafkaSupervisor implements Supervisor
             }
 
             if (taskGroup.tasks.isEmpty()) {
-              log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
+              log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
               return null;
             }
 
@@ -1599,11 +1632,15 @@ public class KafkaSupervisor implements Supervisor
           continue;
         }
 
-        Iterator<Map.Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
+        Iterator<Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
         while (iTask.hasNext()) {
-          Map.Entry<String, TaskData> task = iTask.next();
+          final Entry<String, TaskData> entry = iTask.next();
+          final String taskId = entry.getKey();
+          final TaskData taskData = entry.getValue();
+
+          Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);
 
-          if (task.getValue().status.isFailure()) {
+          if (taskData.status.isFailure()) {
             iTask.remove(); // remove failed task
             if (group.tasks.isEmpty()) {
               // if all tasks in the group have failed, just nuke all task groups with this partition set and restart
@@ -1612,10 +1649,10 @@ public class KafkaSupervisor implements Supervisor
             }
           }
 
-          if (task.getValue().status.isSuccess()) {
+          if (taskData.status.isSuccess()) {
             // If one of the pending completion tasks was successful, stop the rest of the tasks in the group as
             // we no longer need them to publish their segment.
-            log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds());
+            log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds());
             futures.add(stopTasksInGroup(group));
             foundSuccess = true;
             toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups
@@ -1686,6 +1723,8 @@ public class KafkaSupervisor implements Supervisor
           continue;
         }
 
+        Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);
+
         // remove failed tasks
         if (taskData.status.isFailure()) {
           iTasks.remove();
@@ -1713,7 +1752,7 @@ public class KafkaSupervisor implements Supervisor
     taskGroups.entrySet()
               .stream()
               .filter(taskGroup -> taskGroup.getValue().tasks.size() < ioConfig.getReplicas())
-              .forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getKey()));
+              .forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getValue()));
 
     // check that there is a current task group for each group of partitions in [partitionGroups]
     for (Integer groupId : partitionGroups.keySet()) {
@@ -1729,6 +1768,7 @@ public class KafkaSupervisor implements Supervisor
         ) : Optional.<DateTime>absent());
 
         final TaskGroup taskGroup = new TaskGroup(
+            groupId,
             generateStartingOffsetsForPartitionGroup(groupId),
             minimumMessageTime,
             maximumMessageTime
@@ -1955,8 +1995,12 @@ public class KafkaSupervisor implements Supervisor
 
     final List<ListenableFuture<Void>> futures = Lists.newArrayList();
     for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
-      if (!entry.getValue().status.isComplete()) {
-        futures.add(stopTask(entry.getKey(), false));
+      final String taskId = entry.getKey();
+      final TaskData taskData = entry.getValue();
+      if (taskData.status == null) {
+        killTask(taskId);
+      } else if (!taskData.status.isComplete()) {
+        futures.add(stopTask(taskId, false));
       }
     }
 
@@ -2033,7 +2077,7 @@ public class KafkaSupervisor implements Supervisor
       for (TaskGroup taskGroup : taskGroups.values()) {
         for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
           String taskId = entry.getKey();
-          DateTime startTime = entry.getValue().startTime;
+          @Nullable DateTime startTime = entry.getValue().startTime;
           Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
           Long remainingSeconds = null;
           if (startTime != null) {
@@ -2060,7 +2104,7 @@ public class KafkaSupervisor implements Supervisor
         for (TaskGroup taskGroup : taskGroups) {
           for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
             String taskId = entry.getKey();
-            DateTime startTime = entry.getValue().startTime;
+            @Nullable DateTime startTime = entry.getValue().startTime;
             Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
             Long remainingSeconds = null;
             if (taskGroup.completionTimeout != null) {
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java
index 923f69e..bc127e1 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java
@@ -45,7 +45,7 @@ public class TaskReportData
       String id,
       @Nullable Map<Integer, Long> startingOffsets,
       @Nullable Map<Integer, Long> currentOffsets,
-      DateTime startTime,
+      @Nullable DateTime startTime,
       Long remainingSeconds,
       TaskType type,
       @Nullable Map<Integer, Long> lag


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org