You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/27 15:02:01 UTC

[GitHub] fjy closed pull request #6245: [Backport] Fix NPE in KafkaSupervisor.checkpointTaskGroup

fjy closed pull request #6245: [Backport] Fix NPE in KafkaSupervisor.checkpointTaskGroup
URL: https://github.com/apache/incubator-druid/pull/6245
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 63868230c3d..431fe0f4d1a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -146,6 +146,8 @@
    */
   private class TaskGroup
   {
+    final int groupId;
+
     // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
     // this task group has completed successfully, at which point this will be destroyed and a new task group will be
@@ -161,11 +163,13 @@
     final String baseSequenceName;
 
     TaskGroup(
+        int groupId,
         ImmutableMap<Integer, Long> partitionOffsets,
         Optional<DateTime> minimumMessageTime,
         Optional<DateTime> maximumMessageTime
     )
     {
+      this.groupId = groupId;
       this.partitionOffsets = partitionOffsets;
       this.minimumMessageTime = minimumMessageTime;
       this.maximumMessageTime = maximumMessageTime;
@@ -187,9 +191,21 @@ int addNewCheckpoint(Map<Integer, Long> checkpoint)
 
   private static class TaskData
   {
+    @Nullable
     volatile TaskStatus status;
+    @Nullable
     volatile DateTime startTime;
     volatile Map<Integer, Long> currentOffsets = new HashMap<>();
+
+    @Override
+    public String toString()
+    {
+      return "TaskData{" +
+             "status=" + status +
+             ", startTime=" + startTime +
+             ", currentOffsets=" + currentOffsets +
+             '}';
+    }
   }
 
   // Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class
@@ -697,8 +713,8 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc
           log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
           return;
         }
-        final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
-        taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
+        final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroup, false).get();
+        taskGroup.addNewCheckpoint(newCheckpoint);
         log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
       }
     }
@@ -764,10 +780,13 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
                                                       : currentMetadata.getKafkaPartitions()
                                                                        .getPartitionOffsetMap()
                                                                        .get(resetPartitionOffset.getKey());
-          final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
-          if (partitionOffsetInMetadataStore != null ||
-              (partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
-                                                                                .equals(resetPartitionOffset.getValue()))) {
+          final TaskGroup partitionTaskGroup = taskGroups.get(
+              getTaskGroupIdForPartition(resetPartitionOffset.getKey())
+          );
+          final boolean isSameOffset = partitionTaskGroup != null
+                                       && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
+                                                                             .equals(resetPartitionOffset.getValue());
+          if (partitionOffsetInMetadataStore != null || isSameOffset) {
             doReset = true;
             break;
           }
@@ -991,7 +1010,7 @@ private void discoverTasks() throws ExecutionException, InterruptedException, Ti
     List<String> futureTaskIds = Lists.newArrayList();
     List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
     List<Task> tasks = taskStorage.getActiveTasks();
-    final Set<Integer> taskGroupsToVerify = new HashSet<>();
+    final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>();
 
     for (Task task : tasks) {
       if (!(task instanceof KafkaIndexTask) || !dataSource.equals(task.getDataSource())) {
@@ -1098,6 +1117,7 @@ public Boolean apply(KafkaIndexTask.Status status)
                                 k -> {
                                   log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
                                   return new TaskGroup(
+                                      taskGroupId,
                                       ImmutableMap.copyOf(
                                           kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
                                       ),
@@ -1106,8 +1126,15 @@ public Boolean apply(KafkaIndexTask.Status status)
                                   );
                                 }
                             );
-                            taskGroupsToVerify.add(taskGroupId);
-                            taskGroup.tasks.putIfAbsent(taskId, new TaskData());
+                            taskGroupsToVerify.put(taskGroupId, taskGroup);
+                            final TaskData prevTaskGroup = taskGroup.tasks.putIfAbsent(taskId, new TaskData());
+                            if (prevTaskGroup != null) {
+                              throw new ISE(
+                                  "WTH? a taskGroup[%s] already exists for new task[%s]",
+                                  prevTaskGroup,
+                                  taskId
+                              );
+                            }
                           }
                         }
                         return true;
@@ -1135,7 +1162,7 @@ public Boolean apply(KafkaIndexTask.Status status)
     log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource);
 
     // make sure the checkpoints are consistent with each other and with the metadata store
-    taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints);
+    taskGroupsToVerify.values().forEach(this::verifyAndMergeCheckpoints);
   }
 
   /**
@@ -1145,10 +1172,9 @@ public Boolean apply(KafkaIndexTask.Status status)
    * 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly
    * created tasks for the taskGroup start indexing from after the latest published offsets.
    */
-  private void verifyAndMergeCheckpoints(final Integer groupId)
+  private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
   {
-    final TaskGroup taskGroup = taskGroups.get(groupId);
-
+    final int groupId = taskGroup.groupId;
     // List<TaskId, Map -> {SequenceId, Checkpoints}>
     final List<Pair<String, TreeMap<Integer, Map<Integer, Long>>>> taskSequences = new CopyOnWriteArrayList<>();
     final List<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures = new ArrayList<>();
@@ -1302,6 +1328,7 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups(
     // reading the minimumMessageTime & maximumMessageTime from the publishing task and setting it here is not necessary as this task cannot
     // change to a state where it will read any more events
     TaskGroup newTaskGroup = new TaskGroup(
+        groupId,
         ImmutableMap.copyOf(startingPartitions),
         Optional.<DateTime>absent(),
         Optional.<DateTime>absent()
@@ -1339,8 +1366,8 @@ public Boolean apply(@Nullable DateTime startTime)
                       }
 
                       taskData.startTime = startTime;
-                      long millisRemaining = ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis()
-                                                                                       - taskData.startTime.getMillis());
+                      long millisRemaining = ioConfig.getTaskDuration().getMillis() -
+                                             (System.currentTimeMillis() - taskData.startTime.getMillis());
                       if (millisRemaining > 0) {
                         scheduledExec.schedule(
                             buildRunTask(),
@@ -1393,7 +1420,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
       // find the longest running task from this group
       DateTime earliestTaskStart = DateTimes.nowUtc();
       for (TaskData taskData : group.tasks.values()) {
-        if (earliestTaskStart.isAfter(taskData.startTime)) {
+        // startTime can be null if kafkaSupervisor is stopped gracefully before processing any runNotice
+        if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
           earliestTaskStart = taskData.startTime;
         }
       }
@@ -1402,7 +1430,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
       if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
         log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
         futureGroupIds.add(groupId);
-        futures.add(checkpointTaskGroup(groupId, true));
+        futures.add(checkpointTaskGroup(group, true));
       }
     }
 
@@ -1440,10 +1468,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
     }
   }
 
-  private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int groupId, final boolean finalize)
+  private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final TaskGroup taskGroup, final boolean finalize)
   {
-    final TaskGroup taskGroup = taskGroups.get(groupId);
-
     if (finalize) {
       // 1) Check if any task completed (in which case we're done) and kill unassigned tasks
       Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
@@ -1452,30 +1478,33 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
         String taskId = taskEntry.getKey();
         TaskData task = taskEntry.getValue();
 
-        if (task.status.isSuccess()) {
-          // If any task in this group has already completed, stop the rest of the tasks in the group and return.
-          // This will cause us to create a new set of tasks next cycle that will start from the offsets in
-          // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
-          // failed and we need to re-ingest)
-          return Futures.transform(
-              stopTasksInGroup(taskGroup),
-              new Function<Object, Map<Integer, Long>>()
-              {
-                @Nullable
-                @Override
-                public Map<Integer, Long> apply(@Nullable Object input)
+        // task.status can be null if kafkaSupervisor is stopped gracefully before processing any runNotice.
+        if (task.status != null) {
+          if (task.status.isSuccess()) {
+            // If any task in this group has already completed, stop the rest of the tasks in the group and return.
+            // This will cause us to create a new set of tasks next cycle that will start from the offsets in
+            // metadata store (which will have advanced if we succeeded in publishing and will remain the same if
+            // publishing failed and we need to re-ingest)
+            return Futures.transform(
+                stopTasksInGroup(taskGroup),
+                new Function<Object, Map<Integer, Long>>()
                 {
-                  return null;
+                  @Nullable
+                  @Override
+                  public Map<Integer, Long> apply(@Nullable Object input)
+                  {
+                    return null;
+                  }
                 }
-              }
-          );
-        }
+            );
+          }
 
-        if (task.status.isRunnable()) {
-          if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
-            log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
-            killTask(taskId);
-            i.remove();
+          if (task.status.isRunnable()) {
+            if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
+              log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
+              killTask(taskId);
+              i.remove();
+            }
           }
         }
       }
@@ -1522,7 +1551,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
             final List<String> setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds());
 
             if (setEndOffsetTaskIds.isEmpty()) {
-              log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
+              log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
               return null;
             }
 
@@ -1533,11 +1562,15 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
                     "Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]",
                     endOffsets,
                     taskGroup.sequenceOffsets.lastEntry().getValue(),
-                    groupId
+                    taskGroup.groupId
                 );
               }
 
-              log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
+              log.info(
+                  "Setting endOffsets for tasks in taskGroup [%d] to %s and resuming",
+                  taskGroup.groupId,
+                  endOffsets
+              );
               for (final String taskId : setEndOffsetTaskIds) {
                 setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
               }
@@ -1559,7 +1592,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
             }
 
             if (taskGroup.tasks.isEmpty()) {
-              log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
+              log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
               return null;
             }
 
@@ -1599,11 +1632,15 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
           continue;
         }
 
-        Iterator<Map.Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
+        Iterator<Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
         while (iTask.hasNext()) {
-          Map.Entry<String, TaskData> task = iTask.next();
+          final Entry<String, TaskData> entry = iTask.next();
+          final String taskId = entry.getKey();
+          final TaskData taskData = entry.getValue();
+
+          Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);
 
-          if (task.getValue().status.isFailure()) {
+          if (taskData.status.isFailure()) {
             iTask.remove(); // remove failed task
             if (group.tasks.isEmpty()) {
               // if all tasks in the group have failed, just nuke all task groups with this partition set and restart
@@ -1612,10 +1649,10 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
             }
           }
 
-          if (task.getValue().status.isSuccess()) {
+          if (taskData.status.isSuccess()) {
             // If one of the pending completion tasks was successful, stop the rest of the tasks in the group as
             // we no longer need them to publish their segment.
-            log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds());
+            log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds());
             futures.add(stopTasksInGroup(group));
             foundSuccess = true;
             toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups
@@ -1686,6 +1723,8 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
           continue;
         }
 
+        Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);
+
         // remove failed tasks
         if (taskData.status.isFailure()) {
           iTasks.remove();
@@ -1713,7 +1752,7 @@ void createNewTasks() throws JsonProcessingException
     taskGroups.entrySet()
               .stream()
               .filter(taskGroup -> taskGroup.getValue().tasks.size() < ioConfig.getReplicas())
-              .forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getKey()));
+              .forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getValue()));
 
     // check that there is a current task group for each group of partitions in [partitionGroups]
     for (Integer groupId : partitionGroups.keySet()) {
@@ -1729,6 +1768,7 @@ void createNewTasks() throws JsonProcessingException
         ) : Optional.<DateTime>absent());
 
         final TaskGroup taskGroup = new TaskGroup(
+            groupId,
             generateStartingOffsetsForPartitionGroup(groupId),
             minimumMessageTime,
             maximumMessageTime
@@ -1955,8 +1995,12 @@ private boolean isTaskCurrent(int taskGroupId, String taskId)
 
     final List<ListenableFuture<Void>> futures = Lists.newArrayList();
     for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
-      if (!entry.getValue().status.isComplete()) {
-        futures.add(stopTask(entry.getKey(), false));
+      final String taskId = entry.getKey();
+      final TaskData taskData = entry.getValue();
+      if (taskData.status == null) {
+        killTask(taskId);
+      } else if (!taskData.status.isComplete()) {
+        futures.add(stopTask(taskId, false));
       }
     }
 
@@ -2033,7 +2077,7 @@ private KafkaSupervisorReport generateReport(boolean includeOffsets)
       for (TaskGroup taskGroup : taskGroups.values()) {
         for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
           String taskId = entry.getKey();
-          DateTime startTime = entry.getValue().startTime;
+          @Nullable DateTime startTime = entry.getValue().startTime;
           Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
           Long remainingSeconds = null;
           if (startTime != null) {
@@ -2060,7 +2104,7 @@ private KafkaSupervisorReport generateReport(boolean includeOffsets)
         for (TaskGroup taskGroup : taskGroups) {
           for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
             String taskId = entry.getKey();
-            DateTime startTime = entry.getValue().startTime;
+            @Nullable DateTime startTime = entry.getValue().startTime;
             Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
             Long remainingSeconds = null;
             if (taskGroup.completionTimeout != null) {
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java
index 923f69ec401..bc127e17338 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java
@@ -45,7 +45,7 @@ public TaskReportData(
       String id,
       @Nullable Map<Integer, Long> startingOffsets,
       @Nullable Map<Integer, Long> currentOffsets,
-      DateTime startTime,
+      @Nullable DateTime startTime,
       Long remainingSeconds,
       TaskType type,
       @Nullable Map<Integer, Long> lag


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org