You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/06/11 07:45:16 UTC
[kylin] branch master updated: add overwrite method
getKafkaConsumer in KafkaClient (#658)
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new f38d504 add overwrite method getKafkaConsumer in KafkaClient (#658)
f38d504 is described below
commit f38d5046a20d4f9830a10ecc01faf15868375955
Author: liuzx32 <li...@163.com>
AuthorDate: Tue Jun 11 15:45:10 2019 +0800
add overwrite method getKafkaConsumer in KafkaClient (#658)
* add overwrite method getKafkaConsumer in KafkaClient
---
.../java/org/apache/kylin/source/kafka/util/KafkaClient.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index a0cb59a..a781f8a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -29,6 +29,7 @@ import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import java.util.Arrays;
import java.util.List;
@@ -43,6 +44,11 @@ public class KafkaClient {
throw new IllegalStateException("Class KafkaClient is an utility class !");
}
+ public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup) {
+ Properties properties = KafkaConsumerProperties.getInstanceFromEnv().extractKafkaConfigToProperties();
+ return getKafkaConsumer(brokers, consumerGroup, properties);
+ }
+
public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) {
Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
@@ -107,7 +113,7 @@ public class KafkaClient {
final String topic = kafkaConfig.getTopic();
Map<Integer, Long> startOffsets = Maps.newHashMap();
- try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
+ try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
long latest = getLatestOffset(consumer, topic, partitionInfo.partition());
@@ -125,7 +131,7 @@ public class KafkaClient {
final String topic = kafkaConfig.getTopic();
Map<Integer, Long> startOffsets = Maps.newHashMap();
- try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
+ try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
long latest = getEarliestOffset(consumer, topic, partitionInfo.partition());