You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by qm...@apache.org on 2018/11/16 07:24:07 UTC

[incubator-druid] branch master updated: optimize loading end offsets of all partitions (#6623)

This is an automated email from the ASF dual-hosted git repository.

qmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c4cb4b4  optimize loading end offsets of all partitions (#6623)
c4cb4b4 is described below

commit c4cb4b490956bed58024f3bacc67efbdd772a13f
Author: hate13 <45...@qq.com>
AuthorDate: Fri Nov 16 15:24:01 2018 +0800

    optimize loading end offsets of all partitions (#6623)
---
 .../apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java   | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index eac54be..1b5872a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -2291,11 +2291,13 @@ public class KafkaSupervisor implements Supervisor
           .collect(Collectors.toSet());
 
       consumer.assign(topicPartitions);
-      consumer.seekToEnd(topicPartitions);
 
-      latestOffsetsFromKafka = topicPartitions
+      final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
+
+      latestOffsetsFromKafka = endOffsets
+          .entrySet()
           .stream()
-          .collect(Collectors.toMap(TopicPartition::partition, consumer::position));
+          .collect(Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue));
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org