You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/08/15 13:34:31 UTC

[kylin] 07/11: KYLIN-4115 Always load kafkaConsumerProperties

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 063456b4c8ede083806a4fc79df68cbc4623c5fb
Author: chenzhx <ch...@apache.org>
AuthorDate: Fri Jul 26 18:18:13 2019 +0800

    KYLIN-4115 Always  load kafkaConsumerProperties
---
 .../main/java/org/apache/kylin/source/kafka/KafkaSource.java   |  5 ++++-
 .../java/org/apache/kylin/source/kafka/util/KafkaClient.java   | 10 +++++-----
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 0680614..3a7182c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
@@ -44,6 +45,7 @@ import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,7 +107,8 @@ public class KafkaSource implements ISource {
                 .getKafkaConfig(cube.getRootFactTable());
         final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
         final String topic = kafkaConfig.getTopic();
-        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
+        Properties property = KafkaConsumerProperties.getInstanceFromEnv().extractKafkaConfigToProperties();
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), property)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
             logger.info("Get {} partitions for topic {} ", partitionInfos.size(), topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index a781f8a..e8ce87d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -57,16 +57,16 @@ public class KafkaClient {
 
     private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
         Properties props = new Properties();
-        if (properties != null) {
-            for (Map.Entry entry : properties.entrySet()) {
-                props.put(entry.getKey(), entry.getValue());
-            }
-        }
         props.put("bootstrap.servers", brokers);
         props.put("key.deserializer", StringDeserializer.class.getName());
         props.put("value.deserializer", StringDeserializer.class.getName());
         props.put("group.id", consumerGroup);
         props.put("enable.auto.commit", "false");
+        if (properties != null) {
+            for (Map.Entry entry : properties.entrySet()) {
+                props.put(entry.getKey(), entry.getValue());
+            }
+        }
         return props;
     }