You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/08/16 07:14:04 UTC
[kylin] branch master updated: KYLIN-4133 support override
configuration in kafka job
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 73c1a85 KYLIN-4133 support override configuration in kafka job
73c1a85 is described below
commit 73c1a85fe5e3ee2a70592ca98f899b9f41dfc207
Author: chenzhx <ch...@apache.org>
AuthorDate: Tue Aug 13 15:49:45 2019 +0800
KYLIN-4133 support override configuration in kafka job
---
.../source/kafka/hadoop/KafkaFlatTableJob.java | 2 +-
.../kafka/config/KafkaConsumerPropertiesTest.java | 4 ++-
.../src/test/resources/kylin-kafka-consumer.xml | 33 ++++++++++++++++++++++
3 files changed, 37 insertions(+), 2 deletions(-)
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index 2ac4692..fe15860 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -107,7 +107,6 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
- appendKafkaOverrideProperties(cube.getConfig(), job.getConfiguration());
job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
@@ -115,6 +114,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows()));
job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
+ appendKafkaOverrideProperties(cube.getConfig(), job.getConfiguration());
setupMapper(cube.getSegmentById(segmentId));
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, output);
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
index 75cfc2d..30062ea 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -39,7 +39,7 @@ import org.xml.sax.SAXException;
public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
@Before
public void setUp() throws Exception {
- this.createTestMetadata();
+ this.createTestMetadata("src/test/resources");
}
@After
@@ -53,6 +53,8 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
assertFalse(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("acks"));
assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("session.timeout.ms"));
assertEquals("10000", kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms"));
+ assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("client.id"));
+ assertEquals("kylin", kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("client.id"));
}
@Test
diff --git a/source-kafka/src/test/resources/kylin-kafka-consumer.xml b/source-kafka/src/test/resources/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..8c2a34b
--- /dev/null
+++ b/source-kafka/src/test/resources/kylin-kafka-consumer.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+ <property>
+ <name>session.timeout.ms</name>
+ <value>10000</value>
+ </property>
+ <property>
+ <name>request.timeout.ms</name>
+ <value>20000</value>
+ </property>
+ <property>
+ <name>client.id</name>
+ <value>kylin</value>
+ </property>
+</configuration>
\ No newline at end of file