You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/06/11 07:45:16 UTC

[kylin] branch master updated: add overwrite method getKafkaConsumer in KafkaClient (#658)

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new f38d504  add overwrite method getKafkaConsumer in KafkaClient (#658)
f38d504 is described below

commit f38d5046a20d4f9830a10ecc01faf15868375955
Author: liuzx32 <li...@163.com>
AuthorDate: Tue Jun 11 15:45:10 2019 +0800

    add overwrite method getKafkaConsumer in KafkaClient (#658)
    
    * add overwrite method getKafkaConsumer in KafkaClient
---
 .../java/org/apache/kylin/source/kafka/util/KafkaClient.java   | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index a0cb59a..a781f8a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -29,6 +29,7 @@ import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 
 import java.util.Arrays;
 import java.util.List;
@@ -43,6 +44,11 @@ public class KafkaClient {
         throw new IllegalStateException("Class KafkaClient is an utility class !");
     }
 
+    public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup) {
+        Properties properties = KafkaConsumerProperties.getInstanceFromEnv().extractKafkaConfigToProperties();
+        return getKafkaConsumer(brokers, consumerGroup, properties);
+    }
+
     public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) {
         Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties);
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
@@ -107,7 +113,7 @@ public class KafkaClient {
         final String topic = kafkaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
-        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
                 long latest = getLatestOffset(consumer, topic, partitionInfo.partition());
@@ -125,7 +131,7 @@ public class KafkaClient {
         final String topic = kafkaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
-        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
                 long latest = getEarliestOffset(consumer, topic, partitionInfo.partition());