You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/08/15 13:34:31 UTC
[kylin] 07/11: KYLIN-4115 Always load kafkaConsumerProperties
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 063456b4c8ede083806a4fc79df68cbc4623c5fb
Author: chenzhx <ch...@apache.org>
AuthorDate: Fri Jul 26 18:18:13 2019 +0800
KYLIN-4115 Always load kafkaConsumerProperties
---
.../main/java/org/apache/kylin/source/kafka/KafkaSource.java | 5 ++++-
.../java/org/apache/kylin/source/kafka/util/KafkaClient.java | 10 +++++-----
2 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 0680614..3a7182c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
@@ -44,6 +45,7 @@ import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,7 +107,8 @@ public class KafkaSource implements ISource {
.getKafkaConfig(cube.getRootFactTable());
final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
final String topic = kafkaConfig.getTopic();
- try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
+ Properties property = KafkaConsumerProperties.getInstanceFromEnv().extractKafkaConfigToProperties();
+ try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), property)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
logger.info("Get {} partitions for topic {} ", partitionInfos.size(), topic);
for (PartitionInfo partitionInfo : partitionInfos) {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index a781f8a..e8ce87d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -57,16 +57,16 @@ public class KafkaClient {
private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
Properties props = new Properties();
- if (properties != null) {
- for (Map.Entry entry : properties.entrySet()) {
- props.put(entry.getKey(), entry.getValue());
- }
- }
props.put("bootstrap.servers", brokers);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("group.id", consumerGroup);
props.put("enable.auto.commit", "false");
+ if (properties != null) {
+ for (Map.Entry entry : properties.entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ }
return props;
}