You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/17 08:42:19 UTC

kylin git commit: KYLIN-2131, fix KafkaClient depends on local config file

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2131 a0c734d44 -> 6e1834c0d


KYLIN-2131, fix KafkaClient depends on local config file


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6e1834c0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6e1834c0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6e1834c0

Branch: refs/heads/KYLIN-2131
Commit: 6e1834c0d077d25a2634d6b60f51871f03b5ebb5
Parents: a0c734d
Author: Billy Liu <bi...@apache.org>
Authored: Sat Dec 17 16:39:24 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Sat Dec 17 16:39:24 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/source/kafka/KafkaSource.java     | 4 +---
 .../org/apache/kylin/source/kafka/util/KafkaClient.java     | 9 ++-------
 2 files changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/6e1834c0/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 6c1ac1f..7a0363f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -35,7 +35,6 @@ import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ReadableTable;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,8 +93,7 @@ public class KafkaSource implements ISource {
         final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
         final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
         final String topic = kafakaConfig.getTopic();
-        final Properties kafkaProperties = KafkaConsumerProperties.getInstanceFromEnv().getProperties();
-        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), kafkaProperties)) {
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
                 if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/6e1834c0/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 51339c7..69d7440 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -29,7 +29,6 @@ import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 
 import java.util.Arrays;
 import java.util.List;
@@ -40,8 +39,6 @@ import java.util.Properties;
  */
 public class KafkaClient {
 
-    private static KafkaConsumerProperties kafkaFileConfig = KafkaConsumerProperties.getInstanceFromEnv();
-
     public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) {
         Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties);
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
@@ -106,8 +103,7 @@ public class KafkaClient {
         final String topic = kafakaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
-        Properties kafkaProperties = kafkaFileConfig.getProperties();
-        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), kafkaProperties)) {
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
                 long latest = getLatestOffset(consumer, topic, partitionInfo.partition());
@@ -125,8 +121,7 @@ public class KafkaClient {
         final String topic = kafakaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
-        Properties kafkaProperties = kafkaFileConfig.getProperties();
-        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), kafkaProperties)) {
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
                 long latest = getEarliestOffset(consumer, topic, partitionInfo.partition());