You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/16 11:01:38 UTC
kylin git commit: KYLIN-2131,
fix passing kafka consumer properties to hadoop configuration
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2131 08b76cb00 -> 44e959992
KYLIN-2131, fix passing kafka consumer properties to hadoop configuration
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/44e95999
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/44e95999
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/44e95999
Branch: refs/heads/KYLIN-2131
Commit: 44e95999219c3f95e80b1825783c92cdcfc6c55b
Parents: 08b76cb
Author: Billy Liu <bi...@apache.org>
Authored: Fri Dec 16 19:01:20 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Fri Dec 16 19:01:20 2016 +0800
----------------------------------------------------------------------
.../localmeta/kylin-kafka-consumer.properties | 19 ----
.../localmeta/kylin-kafka-consumer.xml | 27 ++++++
.../sandbox/kylin-kafka-consumer.properties | 19 ----
.../sandbox/kylin-kafka-consumer.xml | 27 ++++++
.../kafka/config/KafkaConsumerProperties.java | 94 ++++++++++++--------
.../source/kafka/hadoop/KafkaFlatTableJob.java | 5 +-
.../source/kafka/hadoop/KafkaInputFormat.java | 6 +-
.../kafka/hadoop/KafkaInputRecordReader.java | 6 +-
.../config/KafkaConsumerPropertiesTest.java | 26 +++++-
9 files changed, 142 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/examples/test_case_data/localmeta/kylin-kafka-consumer.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin-kafka-consumer.properties b/examples/test_case_data/localmeta/kylin-kafka-consumer.properties
deleted file mode 100644
index d198f4e..0000000
--- a/examples/test_case_data/localmeta/kylin-kafka-consumer.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
-session.timeout.ms=30000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin-kafka-consumer.xml b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..6da7004
--- /dev/null
+++ b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+ <property>
+ <name>session.timeout.ms</name>
+ <value>30000</value>
+ </property>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/examples/test_case_data/sandbox/kylin-kafka-consumer.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin-kafka-consumer.properties b/examples/test_case_data/sandbox/kylin-kafka-consumer.properties
deleted file mode 100644
index d198f4e..0000000
--- a/examples/test_case_data/sandbox/kylin-kafka-consumer.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
-session.timeout.ms=30000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin-kafka-consumer.xml b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..6da7004
--- /dev/null
+++ b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+ <property>
+ <name>session.timeout.ms</name>
+ <value>30000</value>
+ </property>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index 7a15e63..29589d5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -18,37 +18,40 @@
package org.apache.kylin.source.kafka.config;
-import org.apache.commons.io.FileUtils;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigCannotInitException;
+import org.apache.kylin.common.util.OptionsHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-
public class KafkaConsumerProperties {
+ public static final String KAFKA_CONSUMER_FILE = "kylin-kafka-consumer.xml";
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProperties.class);
-
+ // static cached instances
+ private static KafkaConsumerProperties ENV_INSTANCE = null;
private volatile Properties properties = new Properties();
- public static final String KAFKA_CONSUMER_PROPERTIES_FILE = "kylin-kafka-consumer.properties";
+ private KafkaConsumerProperties() {
- // static cached instances
- private static KafkaConsumerProperties ENV_INSTANCE = null;
+ }
public static KafkaConsumerProperties getInstanceFromEnv() {
synchronized (KafkaConsumerProperties.class) {
if (ENV_INSTANCE == null) {
try {
KafkaConsumerProperties config = new KafkaConsumerProperties();
- config.properties = loadKafkaConsumerProperties();
+ config.properties = config.loadKafkaConsumerProperties();
logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config));
ENV_INSTANCE = config;
@@ -60,50 +63,69 @@ public class KafkaConsumerProperties {
}
}
- public Properties getProperties(){
- Properties result = new Properties();
- result.putAll(properties);
- return result;
+ private static File getKafkaConsumerFile(String path) {
+ if (path == null) {
+ return null;
+ }
+
+ return new File(path, KAFKA_CONSUMER_FILE);
}
- public InputStream getHadoopJobConfInputStream() throws IOException {
- File kafkaProperties = getKafkaConsumerPropertiesFile();
- return FileUtils.openInputStream(kafkaProperties);
+ public static Properties getProperties(Configuration configuration) {
+ Properties result = new Properties();
+ for (Iterator<Map.Entry<String, String>> it = configuration.iterator(); it.hasNext();) {
+ Map.Entry<String, String> entry = it.next();
+ String key = entry.getKey();
+ String value = entry.getValue();
+ result.put(key, value);
+ }
+ // TODO: Not filter non-kafka properties, no issue, but some annoying logs
+ // Tried to leverage Kafka API to find non used properties, but the API is
+ // not open to public
+ return result;
}
- private static Properties loadKafkaConsumerProperties() {
- File propFile = getKafkaConsumerPropertiesFile();
+ private Properties loadKafkaConsumerProperties() {
+ File propFile = getKafkaConsumerFile();
if (propFile == null || !propFile.exists()) {
- logger.error("fail to locate " + KAFKA_CONSUMER_PROPERTIES_FILE);
- throw new RuntimeException("fail to locate " + KAFKA_CONSUMER_PROPERTIES_FILE);
+ logger.error("fail to locate " + KAFKA_CONSUMER_FILE);
+ throw new RuntimeException("fail to locate " + KAFKA_CONSUMER_FILE);
}
- Properties conf = new Properties();
+ Properties properties = new Properties();
try {
FileInputStream is = new FileInputStream(propFile);
- conf.load(is);
+ Configuration conf = new Configuration();
+ conf.addResource(is);
+ properties.putAll(getProperties(conf));
IOUtils.closeQuietly(is);
File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
if (propOverrideFile.exists()) {
FileInputStream ois = new FileInputStream(propOverrideFile);
Properties propOverride = new Properties();
- propOverride.load(ois);
+ Configuration oconf = new Configuration();
+ oconf.addResource(ois);
+ properties.putAll(getProperties(oconf));
IOUtils.closeQuietly(ois);
- conf.putAll(propOverride);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
- return conf;
+ return properties;
+ }
+
+ public String getKafkaConsumerHadoopJobConf(){
+ File kafkaConsumerFile = getKafkaConsumerFile();
+ return OptionsHelper.convertToFileURL(kafkaConsumerFile.getAbsolutePath());
}
- private static File getKafkaConsumerPropertiesFile() {
+ private File getKafkaConsumerFile() {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
String kylinConfHome = System.getProperty(KylinConfig.KYLIN_CONF);
if (!StringUtils.isEmpty(kylinConfHome)) {
logger.info("Use KYLIN_CONF=" + kylinConfHome);
- return getKafkaConsumerPropertiesFile(kylinConfHome);
+ return getKafkaConsumerFile(kylinConfHome);
}
logger.warn("KYLIN_CONF property was not set, will seek KYLIN_HOME env variable");
@@ -113,14 +135,12 @@ public class KafkaConsumerProperties {
throw new KylinConfigCannotInitException("Didn't find KYLIN_CONF or KYLIN_HOME, please set one of them");
String path = kylinHome + File.separator + "conf";
- return getKafkaConsumerPropertiesFile(path);
+ return getKafkaConsumerFile(path);
}
- private static File getKafkaConsumerPropertiesFile(String path) {
- if (path == null) {
- return null;
- }
-
- return new File(path, KAFKA_CONSUMER_PROPERTIES_FILE);
+ public Properties getProperties() {
+ Properties prop = new Properties();
+ prop.putAll(this.properties);
+ return prop;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index e030719..40f12ee 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -61,7 +61,6 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group";
public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
- public static final String CONFIG_KAFKA_CONSUMER_PROPERTIES = "kafka.consumer.properties";
@Override
public int run(String[] args) throws Exception {
@@ -102,8 +101,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
- KafkaConsumerProperties kafkaFileConfig = KafkaConsumerProperties.getInstanceFromEnv();
- job.getConfiguration().addResource(kafkaFileConfig.getHadoopJobConfInputStream(), CONFIG_KAFKA_CONSUMER_PROPERTIES);
+ KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
+ job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index 0aab72e..96bac3f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -19,7 +19,6 @@
package org.apache.kylin.source.kafka.hadoop;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -35,6 +34,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.util.KafkaClient;
import com.google.common.base.Preconditions;
@@ -67,9 +67,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
}
}
- InputStream inputStream = conf.getConfResourceAsInputStream(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_PROPERTIES);
- Properties kafkaProperties = new Properties();
- kafkaProperties.load(inputStream);
+ Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
final List<InputSplit> splits = new ArrayList<InputSplit>();
try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index dfd0e59..e8bd76c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -19,7 +19,6 @@
package org.apache.kylin.source.kafka.hadoop;
import java.io.IOException;
-import java.io.InputStream;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
@@ -35,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,9 +87,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
}
String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
- InputStream inputStream = conf.getConfResourceAsInputStream(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_PROPERTIES);
- Properties kafkaProperties = new Properties();
- kafkaProperties.load(inputStream);
+ Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties);
http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
index 3690439..378ec73 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -4,10 +4,24 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Properties;
+
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.xml.sax.SAXException;
public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
@Before
@@ -21,11 +35,21 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
}
@Test
- public void testLoadKafkaConfig() {
+ public void testLoadKafkaProperties() {
KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks"));
assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms"));
assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms"));
}
+ @Test
+ public void testLoadKafkaPropertiesAsHadoopJobConf() throws IOException, ParserConfigurationException, SAXException {
+ KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
+ Configuration conf = new Configuration(false);
+ conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
+ assertEquals("30000", conf.get("session.timeout.ms"));
+
+ Properties prop = KafkaConsumerProperties.getProperties(conf);
+ assertEquals("30000", prop.getProperty("session.timeout.ms"));
+ }
}