You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/22 07:02:08 UTC
kylin git commit: KYLIN-2296 support cube level kafka config overwrite
Repository: kylin
Updated Branches:
refs/heads/master 78a591798 -> 642981746
KYLIN-2296 support cube level kafka config overwrite
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/64298174
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/64298174
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/64298174
Branch: refs/heads/master
Commit: 642981746b0db15a66a854fdd45876bf685667b5
Parents: 78a5917
Author: Billy Liu <bi...@apache.org>
Authored: Thu Dec 22 15:02:01 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Thu Dec 22 15:02:01 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 8 +++++++
.../kylin/rest/controller/CubeController.java | 2 +-
.../apache/kylin/source/kafka/KafkaSource.java | 8 +++----
.../kafka/config/KafkaConsumerProperties.java | 8 +++----
.../source/kafka/hadoop/KafkaFlatTableJob.java | 25 ++++++++++++++------
.../source/kafka/hadoop/KafkaInputFormat.java | 2 +-
.../kafka/hadoop/KafkaInputRecordReader.java | 2 +-
.../kylin/source/kafka/util/KafkaClient.java | 14 +++++------
.../config/KafkaConsumerPropertiesTest.java | 8 +++----
9 files changed, 48 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 11e48ac..168e5b5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -497,6 +497,14 @@ abstract public class KylinConfigBase implements Serializable {
}
// ============================================================================
+ // SOURCE.KAFKA
+ // ============================================================================
+
+ public Map<String, String> getKafkaConfigOverride() {
+ return getPropertiesByPrefix("kylin.source.kafka.config-override.");
+ }
+
+ // ============================================================================
// STORAGE.HBASE
// ============================================================================
http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 6e3668b..978f477 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -650,7 +650,7 @@ public class CubeController extends BasicController {
final GeneralResponse response = new GeneralResponse();
try {
- final Map<Integer, Long> startOffsets = KafkaClient.getCurrentOffsets(cubeInstance);
+ final Map<Integer, Long> startOffsets = KafkaClient.getLatestOffsets(cubeInstance);
CubeDesc desc = cubeInstance.getDescriptor();
desc.setPartitionOffsetStart(startOffsets);
cubeService.getCubeDescManager().updateCubeDesc(desc);
http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 12f3da2..6689c6e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -89,9 +89,9 @@ public class KafkaSource implements ISource {
}
}
- final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
- final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
- final String topic = kafakaConfig.getTopic();
+ final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
+ final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
+ final String topic = kafkaConfig.getTopic();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
@@ -107,7 +107,7 @@ public class KafkaSource implements ISource {
if (result.getEndOffset() == Long.MAX_VALUE) {
logger.debug("Seek end offsets from topic");
- Map<Integer, Long> latestOffsets = KafkaClient.getCurrentOffsets(cube);
+ Map<Integer, Long> latestOffsets = KafkaClient.getLatestOffsets(cube);
logger.debug("The end offsets are " + latestOffsets);
for (Integer partitionId : latestOffsets.keySet()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index 5bc1a82..8f0dd42 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -75,7 +75,7 @@ public class KafkaConsumerProperties {
return new File(path, KAFKA_CONSUMER_FILE);
}
- public static Properties getProperties(Configuration configuration) {
+ public static Properties extractKafkaConfigToProperties(Configuration configuration) {
Set<String> configNames = new HashSet<String>();
try {
configNames = ConsumerConfig.configNames();
@@ -109,7 +109,7 @@ public class KafkaConsumerProperties {
FileInputStream is = new FileInputStream(propFile);
Configuration conf = new Configuration();
conf.addResource(is);
- properties.putAll(getProperties(conf));
+ properties.putAll(extractKafkaConfigToProperties(conf));
IOUtils.closeQuietly(is);
File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
@@ -118,7 +118,7 @@ public class KafkaConsumerProperties {
Properties propOverride = new Properties();
Configuration oconf = new Configuration();
oconf.addResource(ois);
- properties.putAll(getProperties(oconf));
+ properties.putAll(extractKafkaConfigToProperties(oconf));
IOUtils.closeQuietly(ois);
}
} catch (IOException e) {
@@ -151,7 +151,7 @@ public class KafkaConsumerProperties {
return getKafkaConsumerFile(path);
}
- public Properties getProperties() {
+ public Properties extractKafkaConfigToProperties() {
Properties prop = new Properties();
prop.putAll(this.properties);
return prop;
http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index 40f12ee..f0f48c0 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -18,10 +18,12 @@
package org.apache.kylin.source.kafka.hadoop;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
-import org.apache.kylin.source.kafka.util.KafkaClient;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -34,15 +36,14 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
+import org.apache.kylin.source.kafka.util.KafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-
/**
* Run a Hadoop Job to process the stream data in kafka;
* Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
@@ -103,6 +104,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
+ appendKafkaOverrideProperties(KylinConfig.getInstanceFromEnv(), job.getConfiguration());
job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
@@ -157,6 +159,15 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
job.setNumReduceTasks(0);
}
+ private static void appendKafkaOverrideProperties(final KylinConfig kylinConfig, Configuration conf) {
+ final Map<String, String> kafkaConfOverride = kylinConfig.getKafkaConfigOverride();
+ if (kafkaConfOverride.isEmpty() == false) {
+ for (String key : kafkaConfOverride.keySet()) {
+ conf.set(key, kafkaConfOverride.get(key), "kafka");
+ }
+ }
+ }
+
public static void main(String[] args) throws Exception {
KafkaFlatTableJob job = new KafkaFlatTableJob();
int exitCode = ToolRunner.run(job, args);
http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index 96bac3f..c996c5f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -67,7 +67,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
}
}
- Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
+ Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
final List<InputSplit> splits = new ArrayList<InputSplit>();
try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index e8bd76c..c22c72f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -87,7 +87,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
}
String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
- Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
+ Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties);
http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 69d7440..bd8f90e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -96,11 +96,11 @@ public class KafkaClient {
return consumer.position(topicPartition);
}
- public static Map<Integer, Long> getCurrentOffsets(final CubeInstance cubeInstance) {
- final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
+ public static Map<Integer, Long> getLatestOffsets(final CubeInstance cubeInstance) {
+ final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
- final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
- final String topic = kafakaConfig.getTopic();
+ final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
+ final String topic = kafkaConfig.getTopic();
Map<Integer, Long> startOffsets = Maps.newHashMap();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
@@ -115,10 +115,10 @@ public class KafkaClient {
public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) {
- final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
+ final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
- final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
- final String topic = kafakaConfig.getTopic();
+ final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
+ final String topic = kafkaConfig.getTopic();
Map<Integer, Long> startOffsets = Maps.newHashMap();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
index 8edb84d..1d7863b 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -32,9 +32,9 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
@Test
public void testLoadKafkaProperties() {
KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
- assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks"));
- assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms"));
- assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms"));
+ assertFalse(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("acks"));
+ assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("session.timeout.ms"));
+ assertEquals("30000", kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms"));
}
@Test
@@ -44,7 +44,7 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
assertEquals("30000", conf.get("session.timeout.ms"));
- Properties prop = KafkaConsumerProperties.getProperties(conf);
+ Properties prop = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
assertEquals("30000", prop.getProperty("session.timeout.ms"));
}
}