You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/18 06:15:50 UTC

[07/11] kylin git commit: KYLIN-2131, load kafka config from kylin-kafka-consumer.properties

KYLIN-2131, load kafka config from kylin-kafka-consumer.properties


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ffca41be
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ffca41be
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ffca41be

Branch: refs/heads/KYLIN-2131
Commit: ffca41be520ed644b1965c8e1872c6acfc7d9a0c
Parents: e14f4e1
Author: Billy Liu <bi...@apache.org>
Authored: Thu Dec 15 19:18:32 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Sun Dec 18 14:15:13 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/Kafka10DataLoader.java  |  34 ++++-
 build/conf/kylin-kafka-consumer.properties      |  19 +++
 .../localmeta/kylin-kafka-consumer.properties   |  19 +++
 .../sandbox/kylin-kafka-consumer.properties     |  19 +++
 .../apache/kylin/source/kafka/KafkaSource.java  |   5 +-
 .../source/kafka/config/KafkaClusterConfig.java |   8 --
 .../kylin/source/kafka/config/KafkaConfig.java  |  11 --
 .../kafka/config/KafkaConsumerProperties.java   | 126 +++++++++++++++++++
 .../source/kafka/hadoop/KafkaFlatTableJob.java  |   7 +-
 .../source/kafka/hadoop/KafkaInputFormat.java   |   7 +-
 .../kafka/hadoop/KafkaInputRecordReader.java    |  18 +--
 .../kylin/source/kafka/util/KafkaClient.java    |  43 ++-----
 .../config/KafkaConsumerPropertiesTest.java     |  31 +++++
 webapp/app/js/model/streamingModel.js           |   2 -
 .../partials/cubeDesigner/streamingConfig.html  |  33 +----
 15 files changed, 280 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
index 8c548be..c7a487a 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.job.streaming;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import javax.annotation.Nullable;
@@ -35,8 +36,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 
-import org.apache.kylin.source.kafka.util.KafkaClient;
-
 /**
  * Load prepared data into kafka(for test use)
  */
@@ -60,10 +59,7 @@ public class Kafka10DataLoader extends StreamDataLoader {
             }
         }), ",");
 
-        Properties props = new Properties();
-        props.put("acks", "1");
-        props.put("retry.backoff.ms", "1000");
-        KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props);
+        KafkaProducer producer = getKafkaProducer(brokerList, null);
 
         for (int i = 0; i < messages.size(); i++) {
             ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
@@ -73,4 +69,30 @@ public class Kafka10DataLoader extends StreamDataLoader {
         producer.close();
     }
 
+    public static KafkaProducer getKafkaProducer(String brokers, Properties properties) {
+        Properties props = constructDefaultKafkaProducerProperties(brokers, properties);
+        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
+        return producer;
+    }
+
+    private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties) {
+        Properties props = new Properties();
+        props.put("retry.backoff.ms", "1000");
+        props.put("bootstrap.servers", brokers);
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("acks", "1");
+        props.put("buffer.memory", 33554432);
+        props.put("retries", 0);
+        props.put("batch.size", 16384);
+        props.put("linger.ms", 50);
+        props.put("request.timeout.ms", "30000");
+        if (properties != null) {
+            for (Map.Entry entry : properties.entrySet()) {
+                props.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return props;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/build/conf/kylin-kafka-consumer.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin-kafka-consumer.properties b/build/conf/kylin-kafka-consumer.properties
new file mode 100644
index 0000000..d198f4e
--- /dev/null
+++ b/build/conf/kylin-kafka-consumer.properties
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+session.timeout.ms=30000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/examples/test_case_data/localmeta/kylin-kafka-consumer.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin-kafka-consumer.properties b/examples/test_case_data/localmeta/kylin-kafka-consumer.properties
new file mode 100644
index 0000000..d198f4e
--- /dev/null
+++ b/examples/test_case_data/localmeta/kylin-kafka-consumer.properties
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+session.timeout.ms=30000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/examples/test_case_data/sandbox/kylin-kafka-consumer.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin-kafka-consumer.properties b/examples/test_case_data/sandbox/kylin-kafka-consumer.properties
new file mode 100644
index 0000000..d198f4e
--- /dev/null
+++ b/examples/test_case_data/sandbox/kylin-kafka-consumer.properties
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+session.timeout.ms=30000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index e469f77..1f3c446 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -20,6 +20,7 @@ package org.apache.kylin.source.kafka;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
@@ -34,6 +35,7 @@ import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ReadableTable;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,7 +94,8 @@ public class KafkaSource implements ISource {
         final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
         final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
         final String topic = kafakaConfig.getTopic();
-        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
+        final Properties kafkaProperties = KafkaConsumerProperties.getInstanceFromEnv().getProperties();
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), kafkaProperties)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
                 if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
index 95349c2..3b71189 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
@@ -47,18 +47,10 @@ public class KafkaClusterConfig extends RootPersistentEntity {
     @JsonBackReference
     private KafkaConfig kafkaConfig;
 
-    public int getBufferSize() {
-        return kafkaConfig.getBufferSize();
-    }
-
     public String getTopic() {
         return kafkaConfig.getTopic();
     }
 
-    public int getTimeout() {
-        return kafkaConfig.getTimeout();
-    }
-
     public List<BrokerConfig> getBrokerConfigs() {
         return brokerConfigs;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 157d83c..a096344 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -55,9 +55,6 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("timeout")
     private int timeout;
 
-    @JsonProperty("bufferSize")
-    private int bufferSize;
-
     @JsonProperty("parserName")
     private String parserName;
 
@@ -97,14 +94,6 @@ public class KafkaConfig extends RootPersistentEntity {
         this.timeout = timeout;
     }
 
-    public int getBufferSize() {
-        return bufferSize;
-    }
-
-    public void setBufferSize(int bufferSize) {
-        this.bufferSize = bufferSize;
-    }
-
     public String getTopic() {
         return topic;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
new file mode 100644
index 0000000..7a15e63
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka.config;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigCannotInitException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class KafkaConsumerProperties {
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProperties.class);
+
+    private volatile Properties properties = new Properties();
+
+    public static final String KAFKA_CONSUMER_PROPERTIES_FILE = "kylin-kafka-consumer.properties";
+
+    // static cached instances
+    private static KafkaConsumerProperties ENV_INSTANCE = null;
+
+    public static KafkaConsumerProperties getInstanceFromEnv() {
+        synchronized (KafkaConsumerProperties.class) {
+            if (ENV_INSTANCE == null) {
+                try {
+                    KafkaConsumerProperties config = new KafkaConsumerProperties();
+                    config.properties = loadKafkaConsumerProperties();
+
+                    logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config));
+                    ENV_INSTANCE = config;
+                } catch (IllegalArgumentException e) {
+                    throw new IllegalStateException("Failed to find KafkaConsumerProperties ", e);
+                }
+            }
+            return ENV_INSTANCE;
+        }
+    }
+
+    public Properties getProperties(){
+        Properties result = new Properties();
+        result.putAll(properties);
+        return result;
+    }
+
+    public InputStream getHadoopJobConfInputStream() throws IOException {
+        File kafkaProperties = getKafkaConsumerPropertiesFile();
+        return FileUtils.openInputStream(kafkaProperties);
+    }
+
+    private static Properties loadKafkaConsumerProperties() {
+        File propFile = getKafkaConsumerPropertiesFile();
+        if (propFile == null || !propFile.exists()) {
+            logger.error("fail to locate " + KAFKA_CONSUMER_PROPERTIES_FILE);
+            throw new RuntimeException("fail to locate " + KAFKA_CONSUMER_PROPERTIES_FILE);
+        }
+        Properties conf = new Properties();
+        try {
+            FileInputStream is = new FileInputStream(propFile);
+            conf.load(is);
+            IOUtils.closeQuietly(is);
+
+            File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
+            if (propOverrideFile.exists()) {
+                FileInputStream ois = new FileInputStream(propOverrideFile);
+                Properties propOverride = new Properties();
+                propOverride.load(ois);
+                IOUtils.closeQuietly(ois);
+                conf.putAll(propOverride);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        return conf;
+    }
+
+    private static File getKafkaConsumerPropertiesFile() {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        String kylinConfHome = System.getProperty(KylinConfig.KYLIN_CONF);
+        if (!StringUtils.isEmpty(kylinConfHome)) {
+            logger.info("Use KYLIN_CONF=" + kylinConfHome);
+            return getKafkaConsumerPropertiesFile(kylinConfHome);
+        }
+
+        logger.warn("KYLIN_CONF property was not set, will seek KYLIN_HOME env variable");
+
+        String kylinHome = kylinConfig.getKylinHome();
+        if (StringUtils.isEmpty(kylinHome))
+            throw new KylinConfigCannotInitException("Didn't find KYLIN_CONF or KYLIN_HOME, please set one of them");
+
+        String path = kylinHome + File.separator + "conf";
+        return getKafkaConsumerPropertiesFile(path);
+    }
+
+    private static File getKafkaConsumerPropertiesFile(String path) {
+        if (path == null) {
+            return null;
+        }
+
+        return new File(path, KAFKA_CONSUMER_PROPERTIES_FILE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index f4d54c5..e030719 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.source.kafka.hadoop;
 
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
@@ -57,10 +58,11 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
     public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers";
     public static final String CONFIG_KAFKA_TOPIC = "kafka.topic";
     public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout";
-    public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size";
     public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group";
     public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
     public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
+    public static final String CONFIG_KAFKA_CONSUMER_PROPERTIES = "kafka.consumer.properties";
+
     @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
@@ -100,10 +102,11 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
 
             JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
             job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
+            KafkaConsumerProperties kafkaFileConfig = KafkaConsumerProperties.getInstanceFromEnv();
+            job.getConfiguration().addResource(kafkaFileConfig.getHadoopJobConfInputStream(), CONFIG_KAFKA_CONSUMER_PROPERTIES);
             job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
             job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
-            job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize()));
             job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
             job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
             job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index fe0e2cc..0aab72e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -19,9 +19,11 @@
 package org.apache.kylin.source.kafka.hadoop;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
@@ -65,8 +67,11 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
             }
         }
 
+        InputStream inputStream = conf.getConfResourceAsInputStream(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_PROPERTIES);
+        Properties kafkaProperties = new Properties();
+        kafkaProperties.load(inputStream);
         final List<InputSplit> splits = new ArrayList<InputSplit>();
-        try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) {
+        try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
             Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
             for (int i = 0; i < partitionInfos.size(); i++) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index 6774c9d..dfd0e59 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -19,8 +19,10 @@
 package org.apache.kylin.source.kafka.hadoop;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
@@ -44,6 +46,8 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
 
     static Logger log = LoggerFactory.getLogger(KafkaInputRecordReader.class);
 
+    public static final long DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT = 60000;
+
     private Configuration conf;
 
     private KafkaInputSplit split;
@@ -61,8 +65,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
     private LongWritable key;
     private BytesWritable value;
 
-    private long timeOut = 60000;
-    private long bufferSize = 65536;
+    private long timeOut = DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT;
 
     private long numProcessedMessages = 0L;
 
@@ -82,12 +85,13 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
         if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT) != null) {
             timeOut = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT));
         }
-        if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE) != null) {
-            bufferSize = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE));
-        }
-
         String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
-        consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, null);
+
+        InputStream inputStream = conf.getConfResourceAsInputStream(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_PROPERTIES);
+        Properties kafkaProperties = new Properties();
+        kafkaProperties.load(inputStream);
+
+        consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties);
 
         earliestOffset = this.split.getOffsetStart();
         latestOffset = this.split.getOffsetEnd();

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 3b970b3..f891467 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -20,15 +20,16 @@ package org.apache.kylin.source.kafka.util;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 
 import java.util.Arrays;
 import java.util.List;
@@ -39,49 +40,25 @@ import java.util.Properties;
  */
 public class KafkaClient {
 
+    private static KafkaConsumerProperties kafkaFileConfig = KafkaConsumerProperties.getInstanceFromEnv();
+
     public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) {
         Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties);
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         return consumer;
     }
 
-    public static KafkaProducer getKafkaProducer(String brokers, Properties properties) {
-        Properties props = constructDefaultKafkaProducerProperties(brokers, properties);
-        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
-        return producer;
-    }
-
-    private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties) {
+    private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
         Properties props = new Properties();
-        props.put("bootstrap.servers", brokers);
-        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("acks", "1");
-        props.put("buffer.memory", 33554432);
-        props.put("retries", 0);
-        props.put("batch.size", 16384);
-        props.put("linger.ms", 50);
-        props.put("request.timeout.ms", "30000");
         if (properties != null) {
             for (Map.Entry entry : properties.entrySet()) {
                 props.put(entry.getKey(), entry.getValue());
             }
         }
-        return props;
-    }
-
-    private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
-        Properties props = new Properties();
         props.put("bootstrap.servers", brokers);
-        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put("key.deserializer", StringDeserializer.class.getClass().getCanonicalName());
+        props.put("value.deserializer", StringDeserializer.class.getClass().getCanonicalName());
         props.put("group.id", consumerGroup);
-        props.put("session.timeout.ms", "30000");
-        if (properties != null) {
-            for (Map.Entry entry : properties.entrySet()) {
-                props.put(entry.getKey(), entry.getValue());
-            }
-        }
         props.put("enable.auto.commit", "false");
         return props;
     }
@@ -129,7 +106,8 @@ public class KafkaClient {
         final String topic = kafakaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
-        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
+        Properties kafkaProperties = kafkaFileConfig.getProperties();
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), kafkaProperties)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
                 long latest = getLatestOffset(consumer, topic, partitionInfo.partition());
@@ -147,7 +125,8 @@ public class KafkaClient {
         final String topic = kafakaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
-        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
+        Properties kafkaProperties = kafkaFileConfig.getProperties();
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), kafkaProperties)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
                 long latest = getEarliestOffset(consumer, topic, partitionInfo.partition());

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
new file mode 100644
index 0000000..3690439
--- /dev/null
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -0,0 +1,31 @@
+package org.apache.kylin.source.kafka.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testLoadKafkaConfig() {
+        KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
+        assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks"));
+        assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms"));
+        assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/webapp/app/js/model/streamingModel.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/streamingModel.js b/webapp/app/js/model/streamingModel.js
index 0321806..70f27aa 100644
--- a/webapp/app/js/model/streamingModel.js
+++ b/webapp/app/js/model/streamingModel.js
@@ -33,9 +33,7 @@ KylinApp.service('StreamingModel', function () {
       "name": "",
       "topic": "",
       "timeout": "60000",
-      "bufferSize": "65536",
       "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
-      "margin": "300000",
       "clusters":[],
       "parserProperties":""
       }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/webapp/app/partials/cubeDesigner/streamingConfig.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/streamingConfig.html b/webapp/app/partials/cubeDesigner/streamingConfig.html
index 8bdcd25..5dc9788 100644
--- a/webapp/app/partials/cubeDesigner/streamingConfig.html
+++ b/webapp/app/partials/cubeDesigner/streamingConfig.html
@@ -238,37 +238,9 @@
             </div>
           </div>
 
-          <div class="form-group middle-popover" ng-class="{'required':state.mode=='edit'}">
-            <div class="row">
-              <label class="col-xs-12 col-sm-3 control-label no-padding-right">
-                <b>Buffer Size</b>
-                <i class="fa fa-info-circle" kylinpopover placement="right" title="Buffer Size" template="BufferSizecTip.html"></i>
-              </label>
-
-              <div class="col-xs-12 col-sm-6"
-                   ng-class="{'has-error':form.cube_streaming_form.bufferSize.$invalid && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)}">
-                <input ng-if="state.mode=='edit'" name="bufferSize" required ng-model="kafkaMeta.bufferSize" type="text"
-                       placeholder="Input kafkaConfig bufferSize"
-                       ng-pattern="/^\+?[1-9][0-9]*$/"
-                       class="form-control"/>
-                <small class="help-block"
-                       ng-show="!form.cube_streaming_form.bufferSize.$error.required && form.cube_streaming_form.bufferSize.$invalid && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)">
-                  Kafka bufferSize is invalid.
-                </small>
-                <small class="help-block"
-                       ng-show="form.cube_streaming_form.bufferSize.$error.required && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)">
-                  Kafka bufferSize is required.
-                </small>
-                <span ng-if="state.mode=='view'">{{kafkaMeta.bufferSize}}</span>
-              </div>
-            </div>
-          </div>
-
         </accordion-group>
       </accordion>
 
-
-
     </div>
   </form>
 </div>
@@ -279,11 +251,8 @@
 <script type="text/ng-template" id="TimeoutTip.html">
     <p>Set timeout for kafka client.</p>
   </script>
-<script type="text/ng-template" id="BufferSizecTip.html">
-    <p>Set byte size for kafka client\u2019s buffer.</p>
-  </script>
 <script type="text/ng-template" id="MarginTip.html">
-    <p>When the messages in kafka is not strictly sorted on timestamp, read more data (expressed in ts) before and after the specified interval to avoid data loss.</p>
+    <p>Deprecated. When the messages in kafka is not strictly sorted on timestamp, read more data (expressed in ts) before and after the specified interval to avoid data loss.</p>
   </script>
 <script type="text/ng-template" id="ParserName.html">
     <p>Set the parser to parse source data messages. The default parser works for json messages with a timestamp field.</p>