You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/16 11:01:38 UTC

kylin git commit: KYLIN-2131, fix passing kafka consumer properties to hadoop configuration

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2131 08b76cb00 -> 44e959992


KYLIN-2131, fix passing kafka consumer properties to hadoop configuration


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/44e95999
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/44e95999
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/44e95999

Branch: refs/heads/KYLIN-2131
Commit: 44e95999219c3f95e80b1825783c92cdcfc6c55b
Parents: 08b76cb
Author: Billy Liu <bi...@apache.org>
Authored: Fri Dec 16 19:01:20 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Fri Dec 16 19:01:20 2016 +0800

----------------------------------------------------------------------
 .../localmeta/kylin-kafka-consumer.properties   | 19 ----
 .../localmeta/kylin-kafka-consumer.xml          | 27 ++++++
 .../sandbox/kylin-kafka-consumer.properties     | 19 ----
 .../sandbox/kylin-kafka-consumer.xml            | 27 ++++++
 .../kafka/config/KafkaConsumerProperties.java   | 94 ++++++++++++--------
 .../source/kafka/hadoop/KafkaFlatTableJob.java  |  5 +-
 .../source/kafka/hadoop/KafkaInputFormat.java   |  6 +-
 .../kafka/hadoop/KafkaInputRecordReader.java    |  6 +-
 .../config/KafkaConsumerPropertiesTest.java     | 26 +++++-
 9 files changed, 142 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/examples/test_case_data/localmeta/kylin-kafka-consumer.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin-kafka-consumer.properties b/examples/test_case_data/localmeta/kylin-kafka-consumer.properties
deleted file mode 100644
index d198f4e..0000000
--- a/examples/test_case_data/localmeta/kylin-kafka-consumer.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
-session.timeout.ms=30000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin-kafka-consumer.xml b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..6da7004
--- /dev/null
+++ b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+    <property>
+        <name>session.timeout.ms</name>
+        <value>30000</value>
+    </property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/examples/test_case_data/sandbox/kylin-kafka-consumer.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin-kafka-consumer.properties b/examples/test_case_data/sandbox/kylin-kafka-consumer.properties
deleted file mode 100644
index d198f4e..0000000
--- a/examples/test_case_data/sandbox/kylin-kafka-consumer.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
-session.timeout.ms=30000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin-kafka-consumer.xml b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..6da7004
--- /dev/null
+++ b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+    <property>
+        <name>session.timeout.ms</name>
+        <value>30000</value>
+    </property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index 7a15e63..29589d5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -18,37 +18,40 @@
 
 package org.apache.kylin.source.kafka.config;
 
-import org.apache.commons.io.FileUtils;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigCannotInitException;
+import org.apache.kylin.common.util.OptionsHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-
 public class KafkaConsumerProperties {
 
+    public static final String KAFKA_CONSUMER_FILE = "kylin-kafka-consumer.xml";
     private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProperties.class);
-
+    // static cached instances
+    private static KafkaConsumerProperties ENV_INSTANCE = null;
     private volatile Properties properties = new Properties();
 
-    public static final String KAFKA_CONSUMER_PROPERTIES_FILE = "kylin-kafka-consumer.properties";
+    private KafkaConsumerProperties() {
 
-    // static cached instances
-    private static KafkaConsumerProperties ENV_INSTANCE = null;
+    }
 
     public static KafkaConsumerProperties getInstanceFromEnv() {
         synchronized (KafkaConsumerProperties.class) {
             if (ENV_INSTANCE == null) {
                 try {
                     KafkaConsumerProperties config = new KafkaConsumerProperties();
-                    config.properties = loadKafkaConsumerProperties();
+                    config.properties = config.loadKafkaConsumerProperties();
 
                     logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config));
                     ENV_INSTANCE = config;
@@ -60,50 +63,69 @@ public class KafkaConsumerProperties {
         }
     }
 
-    public Properties getProperties(){
-        Properties result = new Properties();
-        result.putAll(properties);
-        return result;
+    private static File getKafkaConsumerFile(String path) {
+        if (path == null) {
+            return null;
+        }
+
+        return new File(path, KAFKA_CONSUMER_FILE);
     }
 
-    public InputStream getHadoopJobConfInputStream() throws IOException {
-        File kafkaProperties = getKafkaConsumerPropertiesFile();
-        return FileUtils.openInputStream(kafkaProperties);
+    public static Properties getProperties(Configuration configuration) {
+        Properties result = new Properties();
+        for (Iterator<Map.Entry<String, String>> it = configuration.iterator(); it.hasNext();) {
+            Map.Entry<String, String> entry = it.next();
+            String key = entry.getKey();
+            String value = entry.getValue();
+            result.put(key, value);
+        }
+        // TODO: Not filter non-kafka properties, no issue, but some annoying logs
+        // Tried to leverage Kafka API to find non used properties, but the API is
+        // not open to public
+        return result;
     }
 
-    private static Properties loadKafkaConsumerProperties() {
-        File propFile = getKafkaConsumerPropertiesFile();
+    private Properties loadKafkaConsumerProperties() {
+        File propFile = getKafkaConsumerFile();
         if (propFile == null || !propFile.exists()) {
-            logger.error("fail to locate " + KAFKA_CONSUMER_PROPERTIES_FILE);
-            throw new RuntimeException("fail to locate " + KAFKA_CONSUMER_PROPERTIES_FILE);
+            logger.error("fail to locate " + KAFKA_CONSUMER_FILE);
+            throw new RuntimeException("fail to locate " + KAFKA_CONSUMER_FILE);
         }
-        Properties conf = new Properties();
+        Properties properties = new Properties();
         try {
             FileInputStream is = new FileInputStream(propFile);
-            conf.load(is);
+            Configuration conf = new Configuration();
+            conf.addResource(is);
+            properties.putAll(getProperties(conf));
             IOUtils.closeQuietly(is);
 
             File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
             if (propOverrideFile.exists()) {
                 FileInputStream ois = new FileInputStream(propOverrideFile);
                 Properties propOverride = new Properties();
-                propOverride.load(ois);
+                Configuration oconf = new Configuration();
+                oconf.addResource(ois);
+                properties.putAll(getProperties(oconf));
                 IOUtils.closeQuietly(ois);
-                conf.putAll(propOverride);
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
 
-        return conf;
+        return properties;
+    }
+
+    public String getKafkaConsumerHadoopJobConf(){
+        File kafkaConsumerFile = getKafkaConsumerFile();
+        return OptionsHelper.convertToFileURL(kafkaConsumerFile.getAbsolutePath());
     }
 
-    private static File getKafkaConsumerPropertiesFile() {
+    private File getKafkaConsumerFile() {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         String kylinConfHome = System.getProperty(KylinConfig.KYLIN_CONF);
         if (!StringUtils.isEmpty(kylinConfHome)) {
             logger.info("Use KYLIN_CONF=" + kylinConfHome);
-            return getKafkaConsumerPropertiesFile(kylinConfHome);
+            return getKafkaConsumerFile(kylinConfHome);
         }
 
         logger.warn("KYLIN_CONF property was not set, will seek KYLIN_HOME env variable");
@@ -113,14 +135,12 @@ public class KafkaConsumerProperties {
             throw new KylinConfigCannotInitException("Didn't find KYLIN_CONF or KYLIN_HOME, please set one of them");
 
         String path = kylinHome + File.separator + "conf";
-        return getKafkaConsumerPropertiesFile(path);
+        return getKafkaConsumerFile(path);
     }
 
-    private static File getKafkaConsumerPropertiesFile(String path) {
-        if (path == null) {
-            return null;
-        }
-
-        return new File(path, KAFKA_CONSUMER_PROPERTIES_FILE);
+    public Properties getProperties() {
+        Properties prop = new Properties();
+        prop.putAll(this.properties);
+        return prop;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index e030719..40f12ee 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -61,7 +61,6 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
     public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group";
     public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
     public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
-    public static final String CONFIG_KAFKA_CONSUMER_PROPERTIES = "kafka.consumer.properties";
 
     @Override
     public int run(String[] args) throws Exception {
@@ -102,8 +101,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
 
             JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
             job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
-            KafkaConsumerProperties kafkaFileConfig = KafkaConsumerProperties.getInstanceFromEnv();
-            job.getConfiguration().addResource(kafkaFileConfig.getHadoopJobConfInputStream(), CONFIG_KAFKA_CONSUMER_PROPERTIES);
+            KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
+            job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
             job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
             job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));

http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index 0aab72e..96bac3f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.source.kafka.hadoop;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +34,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 
 import com.google.common.base.Preconditions;
@@ -67,9 +67,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
             }
         }
 
-        InputStream inputStream = conf.getConfResourceAsInputStream(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_PROPERTIES);
-        Properties kafkaProperties = new Properties();
-        kafkaProperties.load(inputStream);
+        Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
         final List<InputSplit> splits = new ArrayList<InputSplit>();
         try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);

http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index dfd0e59..e8bd76c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.source.kafka.hadoop;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Properties;
@@ -35,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,9 +87,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
         }
         String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
 
-        InputStream inputStream = conf.getConfResourceAsInputStream(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_PROPERTIES);
-        Properties kafkaProperties = new Properties();
-        kafkaProperties.load(inputStream);
+        Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
 
         consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/44e95999/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
index 3690439..378ec73 100644
--- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -4,10 +4,24 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Properties;
+
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.xml.sax.SAXException;
 
 public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
     @Before
@@ -21,11 +35,21 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
     }
 
     @Test
-    public void testLoadKafkaConfig() {
+    public void testLoadKafkaProperties() {
         KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
         assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks"));
         assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms"));
         assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms"));
     }
 
+    @Test
+    public void testLoadKafkaPropertiesAsHadoopJobConf() throws IOException, ParserConfigurationException, SAXException {
+        KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
+        Configuration conf = new Configuration(false);
+        conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
+        assertEquals("30000", conf.get("session.timeout.ms"));
+
+        Properties prop = KafkaConsumerProperties.getProperties(conf);
+        assertEquals("30000", prop.getProperty("session.timeout.ms"));
+    }
 }