You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/11/17 02:49:57 UTC
[incubator-druid] branch 0.13.0-incubating updated: kafkasupervisor
checkpointing bug (#6639)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.13.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.13.0-incubating by this push:
new 2e6feee kafkasupervisor checkpointing bug (#6639)
2e6feee is described below
commit 2e6feee13f8d281cef0bb9bd416357d06f69fed5
Author: Joshua Sun <jo...@icloud.com>
AuthorDate: Fri Nov 16 16:09:55 2018 -0800
kafkasupervisor checkpointing bug (#6639)
---
.../indexing/kafka/supervisor/KafkaSupervisor.java | 27 ++++++++++++----------
1 file changed, 15 insertions(+), 12 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index ef47c0a..3df7948 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -665,18 +665,21 @@ public class KafkaSupervisor implements Supervisor
})
.findAny()
.map(Entry::getKey);
- taskGroupId = maybeGroupId.orElse(
- pendingCompletionTaskGroups
- .entrySet()
- .stream()
- .filter(entry -> {
- final List<TaskGroup> taskGroups = entry.getValue();
- return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
- })
- .findAny()
- .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
- .getKey()
- );
+
+ if (maybeGroupId.isPresent()) {
+ taskGroupId = maybeGroupId.get();
+ } else {
+ taskGroupId = pendingCompletionTaskGroups
+ .entrySet()
+ .stream()
+ .filter(entry -> {
+ final List<TaskGroup> taskGroups = entry.getValue();
+ return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
+ })
+ .findAny()
+ .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
+ .getKey();
+ }
} else {
taskGroupId = nullableTaskGroupId;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org