You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/22 07:02:08 UTC

kylin git commit: KYLIN-2296 support cube level kafka config overwrite

Repository: kylin
Updated Branches:
  refs/heads/master 78a591798 -> 642981746


KYLIN-2296 support cube level kafka config overwrite


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/64298174
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/64298174
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/64298174

Branch: refs/heads/master
Commit: 642981746b0db15a66a854fdd45876bf685667b5
Parents: 78a5917
Author: Billy Liu <bi...@apache.org>
Authored: Thu Dec 22 15:02:01 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Thu Dec 22 15:02:01 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  8 +++++++
 .../kylin/rest/controller/CubeController.java   |  2 +-
 .../apache/kylin/source/kafka/KafkaSource.java  |  8 +++----
 .../kafka/config/KafkaConsumerProperties.java   |  8 +++----
 .../source/kafka/hadoop/KafkaFlatTableJob.java  | 25 ++++++++++++++------
 .../source/kafka/hadoop/KafkaInputFormat.java   |  2 +-
 .../kafka/hadoop/KafkaInputRecordReader.java    |  2 +-
 .../kylin/source/kafka/util/KafkaClient.java    | 14 +++++------
 .../config/KafkaConsumerPropertiesTest.java     |  8 +++----
 9 files changed, 48 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 11e48ac..168e5b5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -497,6 +497,14 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
+    // SOURCE.KAFKA
+    // ============================================================================
+
+    public Map<String, String> getKafkaConfigOverride() {
+        return getPropertiesByPrefix("kylin.source.kafka.config-override.");
+    }
+
+    // ============================================================================
     // STORAGE.HBASE
     // ============================================================================
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 6e3668b..978f477 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -650,7 +650,7 @@ public class CubeController extends BasicController {
 
         final GeneralResponse response = new GeneralResponse();
         try {
-            final Map<Integer, Long> startOffsets = KafkaClient.getCurrentOffsets(cubeInstance);
+            final Map<Integer, Long> startOffsets = KafkaClient.getLatestOffsets(cubeInstance);
             CubeDesc desc = cubeInstance.getDescriptor();
             desc.setPartitionOffsetStart(startOffsets);
             cubeService.getCubeDescManager().updateCubeDesc(desc);

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 12f3da2..6689c6e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -89,9 +89,9 @@ public class KafkaSource implements ISource {
             }
         }
 
-        final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
-        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
-        final String topic = kafakaConfig.getTopic();
+        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
+        final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
+        final String topic = kafkaConfig.getTopic();
         try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
@@ -107,7 +107,7 @@ public class KafkaSource implements ISource {
 
         if (result.getEndOffset() == Long.MAX_VALUE) {
             logger.debug("Seek end offsets from topic");
-            Map<Integer, Long> latestOffsets = KafkaClient.getCurrentOffsets(cube);
+            Map<Integer, Long> latestOffsets = KafkaClient.getLatestOffsets(cube);
             logger.debug("The end offsets are " + latestOffsets);
 
             for (Integer partitionId : latestOffsets.keySet()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index 5bc1a82..8f0dd42 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -75,7 +75,7 @@ public class KafkaConsumerProperties {
         return new File(path, KAFKA_CONSUMER_FILE);
     }
 
-    public static Properties getProperties(Configuration configuration) {
+    public static Properties extractKafkaConfigToProperties(Configuration configuration) {
         Set<String> configNames = new HashSet<String>();
         try {
             configNames = ConsumerConfig.configNames();
@@ -109,7 +109,7 @@ public class KafkaConsumerProperties {
             FileInputStream is = new FileInputStream(propFile);
             Configuration conf = new Configuration();
             conf.addResource(is);
-            properties.putAll(getProperties(conf));
+            properties.putAll(extractKafkaConfigToProperties(conf));
             IOUtils.closeQuietly(is);
 
             File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
@@ -118,7 +118,7 @@ public class KafkaConsumerProperties {
                 Properties propOverride = new Properties();
                 Configuration oconf = new Configuration();
                 oconf.addResource(ois);
-                properties.putAll(getProperties(oconf));
+                properties.putAll(extractKafkaConfigToProperties(oconf));
                 IOUtils.closeQuietly(ois);
             }
         } catch (IOException e) {
@@ -151,7 +151,7 @@ public class KafkaConsumerProperties {
         return getKafkaConsumerFile(path);
     }
 
-    public Properties getProperties() {
+    public Properties extractKafkaConfigToProperties() {
         Properties prop = new Properties();
         prop.putAll(this.properties);
         return prop;

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index 40f12ee..f0f48c0 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -18,10 +18,12 @@
 
 package org.apache.kylin.source.kafka.hadoop;
 
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
-import org.apache.kylin.source.kafka.util.KafkaClient;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -34,15 +36,14 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
+import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-
 /**
  * Run a Hadoop Job to process the stream data in kafka;
  * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader
@@ -103,6 +104,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
             KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
             job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
+            appendKafkaOverrideProperties(KylinConfig.getInstanceFromEnv(), job.getConfiguration());
             job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
             job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
@@ -157,6 +159,15 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
         job.setNumReduceTasks(0);
     }
 
+    private static void appendKafkaOverrideProperties(final KylinConfig kylinConfig, Configuration conf) {
+        final Map<String, String> kafkaConfOverride = kylinConfig.getKafkaConfigOverride();
+        if (kafkaConfOverride.isEmpty() == false) {
+            for (String key : kafkaConfOverride.keySet()) {
+                conf.set(key, kafkaConfOverride.get(key), "kafka");
+            }
+        }
+    }
+
     public static void main(String[] args) throws Exception {
         KafkaFlatTableJob job = new KafkaFlatTableJob();
         int exitCode = ToolRunner.run(job, args);

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index 96bac3f..c996c5f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -67,7 +67,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
             }
         }
 
-        Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
+        Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
         final List<InputSplit> splits = new ArrayList<InputSplit>();
         try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index e8bd76c..c22c72f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -87,7 +87,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
         }
         String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
 
-        Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
+        Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
 
         consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 69d7440..bd8f90e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -96,11 +96,11 @@ public class KafkaClient {
         return consumer.position(topicPartition);
     }
 
-    public static Map<Integer, Long> getCurrentOffsets(final CubeInstance cubeInstance) {
-        final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
+    public static Map<Integer, Long> getLatestOffsets(final CubeInstance cubeInstance) {
+        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
 
-        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
-        final String topic = kafakaConfig.getTopic();
+        final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
+        final String topic = kafkaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
         try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
@@ -115,10 +115,10 @@ public class KafkaClient {
 
 
     public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) {
-        final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
+        final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
 
-        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
-        final String topic = kafakaConfig.getTopic();
+        final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
+        final String topic = kafkaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
         try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
index 8edb84d..1d7863b 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -32,9 +32,9 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
     @Test
     public void testLoadKafkaProperties() {
         KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
-        assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks"));
-        assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms"));
-        assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms"));
+        assertFalse(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("acks"));
+        assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("session.timeout.ms"));
+        assertEquals("30000", kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms"));
     }
 
     @Test
@@ -44,7 +44,7 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
         conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
         assertEquals("30000", conf.get("session.timeout.ms"));
 
-        Properties prop = KafkaConsumerProperties.getProperties(conf);
+        Properties prop = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
         assertEquals("30000", prop.getProperty("session.timeout.ms"));
     }
 }