You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/11/17 02:49:57 UTC

[incubator-druid] branch 0.13.0-incubating updated: kafkasupervisor checkpointing bug (#6639)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.13.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.13.0-incubating by this push:
     new 2e6feee  kafkasupervisor checkpointing bug (#6639)
2e6feee is described below

commit 2e6feee13f8d281cef0bb9bd416357d06f69fed5
Author: Joshua Sun <jo...@icloud.com>
AuthorDate: Fri Nov 16 16:09:55 2018 -0800

    kafkasupervisor checkpointing bug (#6639)
---
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 27 ++++++++++++----------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index ef47c0a..3df7948 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -665,18 +665,21 @@ public class KafkaSupervisor implements Supervisor
             })
             .findAny()
             .map(Entry::getKey);
-        taskGroupId = maybeGroupId.orElse(
-            pendingCompletionTaskGroups
-                .entrySet()
-                .stream()
-                .filter(entry -> {
-                  final List<TaskGroup> taskGroups = entry.getValue();
-                  return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
-                })
-                .findAny()
-                .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
-                .getKey()
-        );
+
+        if (maybeGroupId.isPresent()) {
+          taskGroupId = maybeGroupId.get();
+        } else {
+          taskGroupId = pendingCompletionTaskGroups
+              .entrySet()
+              .stream()
+              .filter(entry -> {
+                final List<TaskGroup> taskGroups = entry.getValue();
+                return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
+              })
+              .findAny()
+              .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
+              .getKey();
+        }
       } else {
         taskGroupId = nullableTaskGroupId;
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org