You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/17 08:42:19 UTC
kylin git commit: KYLIN-2131,
fix KafkaClient depends on local config file
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2131 a0c734d44 -> 6e1834c0d
KYLIN-2131, fix KafkaClient depends on local config file
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6e1834c0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6e1834c0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6e1834c0
Branch: refs/heads/KYLIN-2131
Commit: 6e1834c0d077d25a2634d6b60f51871f03b5ebb5
Parents: a0c734d
Author: Billy Liu <bi...@apache.org>
Authored: Sat Dec 17 16:39:24 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Sat Dec 17 16:39:24 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/source/kafka/KafkaSource.java | 4 +---
.../org/apache/kylin/source/kafka/util/KafkaClient.java | 9 ++-------
2 files changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/6e1834c0/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 6c1ac1f..7a0363f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -35,7 +35,6 @@ import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ReadableTable;
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,8 +93,7 @@ public class KafkaSource implements ISource {
final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
final String topic = kafakaConfig.getTopic();
- final Properties kafkaProperties = KafkaConsumerProperties.getInstanceFromEnv().getProperties();
- try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), kafkaProperties)) {
+ try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/6e1834c0/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 51339c7..69d7440 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -29,7 +29,6 @@ import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import java.util.Arrays;
import java.util.List;
@@ -40,8 +39,6 @@ import java.util.Properties;
*/
public class KafkaClient {
- private static KafkaConsumerProperties kafkaFileConfig = KafkaConsumerProperties.getInstanceFromEnv();
-
public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) {
Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
@@ -106,8 +103,7 @@ public class KafkaClient {
final String topic = kafakaConfig.getTopic();
Map<Integer, Long> startOffsets = Maps.newHashMap();
- Properties kafkaProperties = kafkaFileConfig.getProperties();
- try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), kafkaProperties)) {
+ try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
long latest = getLatestOffset(consumer, topic, partitionInfo.partition());
@@ -125,8 +121,7 @@ public class KafkaClient {
final String topic = kafakaConfig.getTopic();
Map<Integer, Long> startOffsets = Maps.newHashMap();
- Properties kafkaProperties = kafkaFileConfig.getProperties();
- try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), kafkaProperties)) {
+ try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
long latest = getEarliestOffset(consumer, topic, partitionInfo.partition());