You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/09 18:22:05 UTC

[GitHub] gianm closed pull request #5972: [Backport] Prevent KafkaSupervisor NPE in generateSequenceName

gianm closed pull request #5972: [Backport] Prevent KafkaSupervisor NPE in generateSequenceName 
URL: https://github.com/apache/incubator-druid/pull/5972
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 7cc79f6a98a..5eb783c43c8 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -143,7 +143,8 @@
    * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups]
    * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]).
    */
-  private static class TaskGroup
+  @VisibleForTesting
+  static class TaskGroup
   {
     // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
@@ -758,8 +759,8 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
           resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> {
             final int groupId = getTaskGroupIdForPartition(partition);
             killTaskGroupForPartitions(ImmutableSet.of(partition));
-            sequenceTaskGroup.remove(generateSequenceName(groupId));
-            taskGroups.remove(groupId);
+            final TaskGroup removedGroup = taskGroups.remove(groupId);
+            sequenceTaskGroup.remove(generateSequenceName(removedGroup));
             partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET);
           });
         } else {
@@ -867,12 +868,13 @@ String generateSequenceName(
   }
 
   @VisibleForTesting
-  String generateSequenceName(int groupId)
+  String generateSequenceName(TaskGroup taskGroup)
   {
+    Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
     return generateSequenceName(
-        taskGroups.get(groupId).partitionOffsets,
-        taskGroups.get(groupId).minimumMessageTime,
-        taskGroups.get(groupId).maximumMessageTime
+        taskGroup.partitionOffsets,
+        taskGroup.minimumMessageTime,
+        taskGroup.maximumMessageTime
     );
   }
 
@@ -1066,18 +1068,19 @@ public Boolean apply(KafkaIndexTask.Status status)
                             }
                             return false;
                           } else {
+                            final TaskGroup taskGroup = new TaskGroup(
+                                ImmutableMap.copyOf(
+                                    kafkaTask.getIOConfig()
+                                             .getStartPartitions()
+                                             .getPartitionOffsetMap()
+                                ), kafkaTask.getIOConfig().getMinimumMessageTime(),
+                                kafkaTask.getIOConfig().getMaximumMessageTime()
+                            );
                             if (taskGroups.putIfAbsent(
                                 taskGroupId,
-                                new TaskGroup(
-                                    ImmutableMap.copyOf(
-                                        kafkaTask.getIOConfig()
-                                                 .getStartPartitions()
-                                                 .getPartitionOffsetMap()
-                                    ), kafkaTask.getIOConfig().getMinimumMessageTime(),
-                                    kafkaTask.getIOConfig().getMaximumMessageTime()
-                                )
+                                taskGroup
                             ) == null) {
-                              sequenceTaskGroup.put(generateSequenceName(taskGroupId), taskGroups.get(taskGroupId));
+                              sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(taskGroupId));
                               log.info("Created new task group [%d]", taskGroupId);
                             }
                             taskGroupsToVerify.add(taskGroupId);
@@ -1234,7 +1237,7 @@ public void onFailure(Throwable t)
       // killing all tasks or no task left in the group ?
       // clear state about the taskgroup so that get latest offset information is fetched from metadata store
       log.warn("Clearing task group [%d] information as no valid tasks left the group", groupId);
-      sequenceTaskGroup.remove(generateSequenceName(groupId));
+      sequenceTaskGroup.remove(generateSequenceName(taskGroup));
       taskGroups.remove(groupId);
       partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
     }
@@ -1410,7 +1413,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
         partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
       }
 
-      sequenceTaskGroup.remove(generateSequenceName(groupId));
+      sequenceTaskGroup.remove(generateSequenceName(group));
       // remove this task group from the list of current task groups now that it has been handled
       taskGroups.remove(groupId);
     }
@@ -1612,8 +1615,7 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
 
           // reset partitions offsets for this task group so that they will be re-read from metadata storage
           partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
-          sequenceTaskGroup.remove(generateSequenceName(groupId));
-
+          sequenceTaskGroup.remove(generateSequenceName(group));
           // kill all the tasks in this pending completion group
           killTasksInGroup(group);
           // set a flag so the other pending completion groups for this set of partitions will also stop
@@ -1673,7 +1675,7 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
         // be recreated with the next set of offsets
         if (taskData.status.isSuccess()) {
           futures.add(stopTasksInGroup(taskGroup));
-          sequenceTaskGroup.remove(generateSequenceName(groupId));
+          sequenceTaskGroup.remove(generateSequenceName(taskGroup));
           iTaskGroups.remove();
           break;
         }
@@ -1706,15 +1708,16 @@ void createNewTasks() throws JsonProcessingException
             DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get())
         ) : Optional.<DateTime>absent());
 
+        final TaskGroup taskGroup = new TaskGroup(
+            generateStartingOffsetsForPartitionGroup(groupId),
+            minimumMessageTime,
+            maximumMessageTime
+        );
         taskGroups.put(
             groupId,
-            new TaskGroup(
-                generateStartingOffsetsForPartitionGroup(groupId),
-                minimumMessageTime,
-                maximumMessageTime
-            )
+            taskGroup
         );
-        sequenceTaskGroup.put(generateSequenceName(groupId), taskGroups.get(groupId));
+        sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(groupId));
       }
     }
 
@@ -1749,8 +1752,8 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc
     for (Integer partition : startPartitions.keySet()) {
       endPartitions.put(partition, Long.MAX_VALUE);
     }
-
-    String sequenceName = generateSequenceName(groupId);
+    TaskGroup group = taskGroups.get(groupId);
+    String sequenceName = generateSequenceName(group);
 
     Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
     DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull();
@@ -1911,7 +1914,7 @@ private boolean isTaskCurrent(int taskGroupId, String taskId)
 
     String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName();
     if (taskGroups.get(taskGroupId) != null) {
-      return generateSequenceName(taskGroupId).equals(taskSequenceName);
+      return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName);
     } else {
       return generateSequenceName(
           ((KafkaIndexTask) taskOptional.get()).getIOConfig()
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 29041e27de5..3c2b1989571 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2087,12 +2087,6 @@ public TestableKafkaSupervisor(
       super(taskStorage, taskMaster, indexerMetadataStorageCoordinator, taskClientFactory, mapper, spec);
     }
 
-    @Override
-    protected String generateSequenceName(int groupId)
-    {
-      return StringUtils.format("sequenceName-%d", groupId);
-    }
-
     @Override
     protected String generateSequenceName(
         Map<Integer, Long> startPartitions,
@@ -2100,7 +2094,8 @@ protected String generateSequenceName(
         Optional<DateTime> maximumMessageTime
     )
     {
-      return generateSequenceName(getTaskGroupIdForPartition(startPartitions.keySet().iterator().next()));
+      final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next());
+      return StringUtils.format("sequenceName-%d", groupId);
     }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org