You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/21 09:45:10 UTC

kylin git commit: KYLIN-2131 Load Kafka Consumer Configuration from kylin-kafka-consumer.xml

Repository: kylin
Updated Branches:
  refs/heads/master 8cb8f279b -> 9dbc24bf3


KYLIN-2131 Load Kafka Consumer Configuration from kylin-kafka-consumer.xml


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9dbc24bf
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9dbc24bf
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9dbc24bf

Branch: refs/heads/master
Commit: 9dbc24bf320ff761ae83028f868feb4a4424f083
Parents: 8cb8f27
Author: Billy Liu <bi...@apache.org>
Authored: Wed Dec 21 17:44:53 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Wed Dec 21 17:44:53 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/Kafka10DataLoader.java  |  35 +++-
 build/conf/kylin-kafka-consumer.xml             |  27 ++++
 .../localmeta/kylin-kafka-consumer.xml          |  27 ++++
 .../sandbox/kylin-kafka-consumer.xml            |  27 ++++
 pom.xml                                         |   2 +-
 .../apache/kylin/source/kafka/KafkaSource.java  |   4 +-
 .../source/kafka/config/KafkaClusterConfig.java |   4 -
 .../kylin/source/kafka/config/KafkaConfig.java  |  11 --
 .../kafka/config/KafkaConsumerProperties.java   | 159 +++++++++++++++++++
 .../source/kafka/hadoop/KafkaFlatTableJob.java  |   6 +-
 .../source/kafka/hadoop/KafkaInputFormat.java   |   5 +-
 .../kafka/hadoop/KafkaInputRecordReader.java    |  16 +-
 .../kylin/source/kafka/util/KafkaClient.java    |  34 +---
 .../config/KafkaConsumerPropertiesTest.java     |  50 ++++++
 webapp/app/js/model/streamingModel.js           |   2 -
 .../partials/cubeDesigner/streamingConfig.html  |  33 +---
 16 files changed, 344 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
index 8c548be..fae81ce 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.job.streaming;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import javax.annotation.Nullable;
@@ -26,6 +27,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -35,8 +37,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 
-import org.apache.kylin.source.kafka.util.KafkaClient;
-
 /**
  * Load prepared data into kafka(for test use)
  */
@@ -60,10 +60,7 @@ public class Kafka10DataLoader extends StreamDataLoader {
             }
         }), ",");
 
-        Properties props = new Properties();
-        props.put("acks", "1");
-        props.put("retry.backoff.ms", "1000");
-        KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props);
+        KafkaProducer producer = getKafkaProducer(brokerList, null);
 
         for (int i = 0; i < messages.size(); i++) {
             ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
@@ -73,4 +70,30 @@ public class Kafka10DataLoader extends StreamDataLoader {
         producer.close();
     }
 
+    public static KafkaProducer getKafkaProducer(String brokers, Properties properties) {
+        Properties props = constructDefaultKafkaProducerProperties(brokers, properties);
+        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
+        return producer;
+    }
+
+    private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties) {
+        Properties props = new Properties();
+        props.put("retry.backoff.ms", "1000");
+        props.put("bootstrap.servers", brokers);
+        props.put("key.serializer", StringSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+        props.put("acks", "1");
+        props.put("buffer.memory", 33554432);
+        props.put("retries", 0);
+        props.put("batch.size", 16384);
+        props.put("linger.ms", 50);
+        props.put("request.timeout.ms", "30000");
+        if (properties != null) {
+            for (Map.Entry entry : properties.entrySet()) {
+                props.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return props;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/build/conf/kylin-kafka-consumer.xml
----------------------------------------------------------------------
diff --git a/build/conf/kylin-kafka-consumer.xml b/build/conf/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..6da7004
--- /dev/null
+++ b/build/conf/kylin-kafka-consumer.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+    <property>
+        <name>session.timeout.ms</name>
+        <value>30000</value>
+    </property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin-kafka-consumer.xml b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..6da7004
--- /dev/null
+++ b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+    <property>
+        <name>session.timeout.ms</name>
+        <value>30000</value>
+    </property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin-kafka-consumer.xml b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..6da7004
--- /dev/null
+++ b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+    <property>
+        <name>session.timeout.ms</name>
+        <value>30000</value>
+    </property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 51479c8..c837b10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
 
         <!-- HBase versions -->
         <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
-        <kafka.version>0.10.0.0</kafka.version>
+        <kafka.version>0.10.1.0</kafka.version>
 
         <!-- Hadoop deps, keep compatible with hadoop2.version -->
         <zookeeper.version>3.4.6</zookeeper.version>

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index e469f77..12f3da2 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -83,8 +83,8 @@ public class KafkaSource implements ISource {
                 logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart());
                 result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
             } else {
-                // from the topic's very begining;
-                logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's very beginning.");
+                // from the topic's earliest offset;
+                logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
                 result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
index 95349c2..afe888f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
@@ -47,10 +47,6 @@ public class KafkaClusterConfig extends RootPersistentEntity {
     @JsonBackReference
     private KafkaConfig kafkaConfig;
 
-    public int getBufferSize() {
-        return kafkaConfig.getBufferSize();
-    }
-
     public String getTopic() {
         return kafkaConfig.getTopic();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 157d83c..a096344 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -55,9 +55,6 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("timeout")
     private int timeout;
 
-    @JsonProperty("bufferSize")
-    private int bufferSize;
-
     @JsonProperty("parserName")
     private String parserName;
 
@@ -97,14 +94,6 @@ public class KafkaConfig extends RootPersistentEntity {
         this.timeout = timeout;
     }
 
-    public int getBufferSize() {
-        return bufferSize;
-    }
-
-    public void setBufferSize(int bufferSize) {
-        this.bufferSize = bufferSize;
-    }
-
     public String getTopic() {
         return topic;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
new file mode 100644
index 0000000..5bc1a82
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka.config;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigCannotInitException;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConsumerProperties {
+
+    public static final String KAFKA_CONSUMER_FILE = "kylin-kafka-consumer.xml";
+    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProperties.class);
+    // static cached instances
+    private static KafkaConsumerProperties ENV_INSTANCE = null;
+    private volatile Properties properties = new Properties();
+
+    private KafkaConsumerProperties() {
+
+    }
+
+    public static KafkaConsumerProperties getInstanceFromEnv() {
+        synchronized (KafkaConsumerProperties.class) {
+            if (ENV_INSTANCE == null) {
+                try {
+                    KafkaConsumerProperties config = new KafkaConsumerProperties();
+                    config.properties = config.loadKafkaConsumerProperties();
+
+                    logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config));
+                    ENV_INSTANCE = config;
+                } catch (IllegalArgumentException e) {
+                    throw new IllegalStateException("Failed to find KafkaConsumerProperties ", e);
+                }
+            }
+            return ENV_INSTANCE;
+        }
+    }
+
+    private static File getKafkaConsumerFile(String path) {
+        if (path == null) {
+            return null;
+        }
+
+        return new File(path, KAFKA_CONSUMER_FILE);
+    }
+
+    public static Properties getProperties(Configuration configuration) {
+        Set<String> configNames = new HashSet<String>();
+        try {
+            configNames = ConsumerConfig.configNames();
+        } catch (Exception e) {
+            // the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException
+            String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms,"
+                    + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset").split(",");
+            configNames.addAll(Arrays.asList(configNamesArray));
+        }
+
+        Properties result = new Properties();
+        for (Iterator<Map.Entry<String, String>> it = configuration.iterator(); it.hasNext();) {
+            Map.Entry<String, String> entry = it.next();
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (configNames.contains(key)) {
+                result.put(key, value);
+            }
+        }
+        return result;
+    }
+
+    private Properties loadKafkaConsumerProperties() {
+        File propFile = getKafkaConsumerFile();
+        if (propFile == null || !propFile.exists()) {
+            logger.error("fail to locate " + KAFKA_CONSUMER_FILE);
+            throw new RuntimeException("fail to locate " + KAFKA_CONSUMER_FILE);
+        }
+        Properties properties = new Properties();
+        try {
+            FileInputStream is = new FileInputStream(propFile);
+            Configuration conf = new Configuration();
+            conf.addResource(is);
+            properties.putAll(getProperties(conf));
+            IOUtils.closeQuietly(is);
+
+            File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
+            if (propOverrideFile.exists()) {
+                FileInputStream ois = new FileInputStream(propOverrideFile);
+                Properties propOverride = new Properties();
+                Configuration oconf = new Configuration();
+                oconf.addResource(ois);
+                properties.putAll(getProperties(oconf));
+                IOUtils.closeQuietly(ois);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        return properties;
+    }
+
+    public String getKafkaConsumerHadoopJobConf() {
+        File kafkaConsumerFile = getKafkaConsumerFile();
+        return OptionsHelper.convertToFileURL(kafkaConsumerFile.getAbsolutePath());
+    }
+
+    private File getKafkaConsumerFile() {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        String kylinConfHome = System.getProperty(KylinConfig.KYLIN_CONF);
+        if (!StringUtils.isEmpty(kylinConfHome)) {
+            logger.info("Use KYLIN_CONF=" + kylinConfHome);
+            return getKafkaConsumerFile(kylinConfHome);
+        }
+
+        logger.warn("KYLIN_CONF property was not set, will seek KYLIN_HOME env variable");
+
+        String kylinHome = kylinConfig.getKylinHome();
+        if (StringUtils.isEmpty(kylinHome))
+            throw new KylinConfigCannotInitException("Didn't find KYLIN_CONF or KYLIN_HOME, please set one of them");
+
+        String path = kylinHome + File.separator + "conf";
+        return getKafkaConsumerFile(path);
+    }
+
+    public Properties getProperties() {
+        Properties prop = new Properties();
+        prop.putAll(this.properties);
+        return prop;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index f4d54c5..40f12ee 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.source.kafka.hadoop;
 
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
@@ -57,10 +58,10 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
     public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers";
     public static final String CONFIG_KAFKA_TOPIC = "kafka.topic";
     public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout";
-    public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size";
     public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group";
     public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
     public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
+
     @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
@@ -100,10 +101,11 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
 
             JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
             job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
+            KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
+            job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
             job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
             job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
-            job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize()));
             job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
             job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
             job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index fe0e2cc..96bac3f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
@@ -33,6 +34,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 
 import com.google.common.base.Preconditions;
@@ -65,8 +67,9 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
             }
         }
 
+        Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
         final List<InputSplit> splits = new ArrayList<InputSplit>();
-        try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) {
+        try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
             Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
             for (int i = 0; i < partitionInfos.size(); i++) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index 6774c9d..e8bd76c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka.hadoop;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
@@ -33,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +46,8 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
 
     static Logger log = LoggerFactory.getLogger(KafkaInputRecordReader.class);
 
+    public static final long DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT = 60000;
+
     private Configuration conf;
 
     private KafkaInputSplit split;
@@ -61,8 +65,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
     private LongWritable key;
     private BytesWritable value;
 
-    private long timeOut = 60000;
-    private long bufferSize = 65536;
+    private long timeOut = DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT;
 
     private long numProcessedMessages = 0L;
 
@@ -82,12 +85,11 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
         if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT) != null) {
             timeOut = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT));
         }
-        if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE) != null) {
-            bufferSize = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE));
-        }
-
         String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
-        consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, null);
+
+        Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
+
+        consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties);
 
         earliestOffset = this.split.getOffsetStart();
         latestOffset = this.split.getOffsetEnd();

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 3b970b3..69d7440 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -20,9 +20,9 @@ package org.apache.kylin.source.kafka.util;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
@@ -45,43 +45,17 @@ public class KafkaClient {
         return consumer;
     }
 
-    public static KafkaProducer getKafkaProducer(String brokers, Properties properties) {
-        Properties props = constructDefaultKafkaProducerProperties(brokers, properties);
-        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
-        return producer;
-    }
-
-    private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties) {
+    private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
         Properties props = new Properties();
-        props.put("bootstrap.servers", brokers);
-        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("acks", "1");
-        props.put("buffer.memory", 33554432);
-        props.put("retries", 0);
-        props.put("batch.size", 16384);
-        props.put("linger.ms", 50);
-        props.put("request.timeout.ms", "30000");
         if (properties != null) {
             for (Map.Entry entry : properties.entrySet()) {
                 props.put(entry.getKey(), entry.getValue());
             }
         }
-        return props;
-    }
-
-    private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
-        Properties props = new Properties();
         props.put("bootstrap.servers", brokers);
-        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put("key.deserializer", StringDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
         props.put("group.id", consumerGroup);
-        props.put("session.timeout.ms", "30000");
-        if (properties != null) {
-            for (Map.Entry entry : properties.entrySet()) {
-                props.put(entry.getKey(), entry.getValue());
-            }
-        }
         props.put("enable.auto.commit", "false");
         return props;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
new file mode 100644
index 0000000..8edb84d
--- /dev/null
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -0,0 +1,50 @@
+package org.apache.kylin.source.kafka.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testLoadKafkaProperties() {
+        KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
+        assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks"));
+        assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms"));
+        assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms"));
+    }
+
+    @Test
+    public void testLoadKafkaPropertiesAsHadoopJobConf() throws IOException, ParserConfigurationException, SAXException {
+        KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
+        Configuration conf = new Configuration(false);
+        conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
+        assertEquals("30000", conf.get("session.timeout.ms"));
+
+        Properties prop = KafkaConsumerProperties.getProperties(conf);
+        assertEquals("30000", prop.getProperty("session.timeout.ms"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/webapp/app/js/model/streamingModel.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/streamingModel.js b/webapp/app/js/model/streamingModel.js
index 0321806..70f27aa 100644
--- a/webapp/app/js/model/streamingModel.js
+++ b/webapp/app/js/model/streamingModel.js
@@ -33,9 +33,7 @@ KylinApp.service('StreamingModel', function () {
       "name": "",
       "topic": "",
       "timeout": "60000",
-      "bufferSize": "65536",
       "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
-      "margin": "300000",
       "clusters":[],
       "parserProperties":""
       }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/webapp/app/partials/cubeDesigner/streamingConfig.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/streamingConfig.html b/webapp/app/partials/cubeDesigner/streamingConfig.html
index 8bdcd25..5dc9788 100644
--- a/webapp/app/partials/cubeDesigner/streamingConfig.html
+++ b/webapp/app/partials/cubeDesigner/streamingConfig.html
@@ -238,37 +238,9 @@
             </div>
           </div>
 
-          <div class="form-group middle-popover" ng-class="{'required':state.mode=='edit'}">
-            <div class="row">
-              <label class="col-xs-12 col-sm-3 control-label no-padding-right">
-                <b>Buffer Size</b>
-                <i class="fa fa-info-circle" kylinpopover placement="right" title="Buffer Size" template="BufferSizecTip.html"></i>
-              </label>
-
-              <div class="col-xs-12 col-sm-6"
-                   ng-class="{'has-error':form.cube_streaming_form.bufferSize.$invalid && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)}">
-                <input ng-if="state.mode=='edit'" name="bufferSize" required ng-model="kafkaMeta.bufferSize" type="text"
-                       placeholder="Input kafkaConfig bufferSize"
-                       ng-pattern="/^\+?[1-9][0-9]*$/"
-                       class="form-control"/>
-                <small class="help-block"
-                       ng-show="!form.cube_streaming_form.bufferSize.$error.required && form.cube_streaming_form.bufferSize.$invalid && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)">
-                  Kafka bufferSize is invalid.
-                </small>
-                <small class="help-block"
-                       ng-show="form.cube_streaming_form.bufferSize.$error.required && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)">
-                  Kafka bufferSize is required.
-                </small>
-                <span ng-if="state.mode=='view'">{{kafkaMeta.bufferSize}}</span>
-              </div>
-            </div>
-          </div>
-
         </accordion-group>
       </accordion>
 
-
-
     </div>
   </form>
 </div>
@@ -279,11 +251,8 @@
 <script type="text/ng-template" id="TimeoutTip.html">
     <p>Set timeout for kafka client.</p>
   </script>
-<script type="text/ng-template" id="BufferSizecTip.html">
-    <p>Set byte size for kafka client\u2019s buffer.</p>
-  </script>
 <script type="text/ng-template" id="MarginTip.html">
-    <p>When the messages in kafka is not strictly sorted on timestamp, read more data (expressed in ts) before and after the specified interval to avoid data loss.</p>
+    <p>Deprecated. When the messages in kafka is not strictly sorted on timestamp, read more data (expressed in ts) before and after the specified interval to avoid data loss.</p>
   </script>
 <script type="text/ng-template" id="ParserName.html">
     <p>Set the parser to parse source data messages. The default parser works for json messages with a timestamp field.</p>