You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/04/05 23:12:44 UTC

[GitHub] [incubator-druid] jon-wei commented on a change in pull request #7212: Support Kafka supervisor adopting running tasks between versions

jon-wei commented on a change in pull request #7212: Support Kafka supervisor adopting running tasks between versions 
URL: https://github.com/apache/incubator-druid/pull/7212#discussion_r272767903
 
 

 ##########
 File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 ##########
 @@ -1630,30 +1700,48 @@ public Void apply(@Nullable Boolean result)
     );
   }
 
-  private boolean isTaskCurrent(int taskGroupId, String taskId)
+  @VisibleForTesting
+  public boolean isTaskCurrent(int taskGroupId, String taskId)
   {
     Optional<Task> taskOptional = taskStorage.getTask(taskId);
     if (!taskOptional.isPresent() || !doesTaskTypeMatchSupervisor(taskOptional.get())) {
       return false;
     }
 
     @SuppressWarnings("unchecked")
-    SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task = (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional
-        .get();
+    SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task =
+        (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional.get();
+
+    // We recompute the sequence name hash for the supervisor's own configuration and compare this to the hash created
+    // by rehashing the task's sequence name using the most up-to-date class definitions of tuning config and
+    // data schema. Recomputing both ensures that forwards-compatible tasks won't be killed (which would occur
+    // if the hash generated using the old class definitions was used).
+    String taskSequenceName = generateSequenceName(
+        task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+        task.getIOConfig().getMinimumMessageTime(),
+        task.getIOConfig().getMaximumMessageTime(),
+        task.getDataSchema(),
+        task.getTuningConfig()
+    );
 
-    String taskSequenceName = task.getIOConfig().getBaseSequenceName();
     if (activelyReadingTaskGroups.get(taskGroupId) != null) {
-      return Preconditions
-          .checkNotNull(activelyReadingTaskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId)
-          .baseSequenceName
-          .equals(taskSequenceName);
+      TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId);
 
 Review comment:
   I think this needs a null check

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org