You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/08/16 07:14:04 UTC

[kylin] branch master updated: KYLIN-4133 support override configuration in kafka job

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 73c1a85  KYLIN-4133 support override configuration in kafka job
73c1a85 is described below

commit 73c1a85fe5e3ee2a70592ca98f899b9f41dfc207
Author: chenzhx <ch...@apache.org>
AuthorDate: Tue Aug 13 15:49:45 2019 +0800

    KYLIN-4133 support override configuration in kafka job
---
 .../source/kafka/hadoop/KafkaFlatTableJob.java     |  2 +-
 .../kafka/config/KafkaConsumerPropertiesTest.java  |  4 ++-
 .../src/test/resources/kylin-kafka-consumer.xml    | 33 ++++++++++++++++++++++
 3 files changed, 37 insertions(+), 2 deletions(-)

diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index 2ac4692..fe15860 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -107,7 +107,6 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
             KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
             job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
-            appendKafkaOverrideProperties(cube.getConfig(), job.getConfiguration());
             job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
             job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
@@ -115,6 +114,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
             job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows()));
             job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
+            appendKafkaOverrideProperties(cube.getConfig(), job.getConfiguration());
             setupMapper(cube.getSegmentById(segmentId));
             job.setNumReduceTasks(0);
             FileOutputFormat.setOutputPath(job, output);
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
index 75cfc2d..30062ea 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -39,7 +39,7 @@ import org.xml.sax.SAXException;
 public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
     @Before
     public void setUp() throws Exception {
-        this.createTestMetadata();
+        this.createTestMetadata("src/test/resources");
     }
 
     @After
@@ -53,6 +53,8 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
         assertFalse(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("acks"));
         assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("session.timeout.ms"));
         assertEquals("10000", kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms"));
+        assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("client.id"));
+        assertEquals("kylin", kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("client.id"));
     }
 
     @Test
diff --git a/source-kafka/src/test/resources/kylin-kafka-consumer.xml b/source-kafka/src/test/resources/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..8c2a34b
--- /dev/null
+++ b/source-kafka/src/test/resources/kylin-kafka-consumer.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+    <property>
+        <name>session.timeout.ms</name>
+        <value>10000</value>
+    </property>
+    <property>
+        <name>request.timeout.ms</name>
+        <value>20000</value>
+    </property>
+    <property>
+        <name>client.id</name>
+        <value>kylin</value>
+    </property>
+</configuration>
\ No newline at end of file