You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/21 09:45:10 UTC
kylin git commit: KYLIN-2131 Load Kafka Consumer Configuration from
kylin-kafka-consumer.xml
Repository: kylin
Updated Branches:
refs/heads/master 8cb8f279b -> 9dbc24bf3
KYLIN-2131 Load Kafka Consumer Configuration from kylin-kafka-consumer.xml
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9dbc24bf
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9dbc24bf
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9dbc24bf
Branch: refs/heads/master
Commit: 9dbc24bf320ff761ae83028f868feb4a4424f083
Parents: 8cb8f27
Author: Billy Liu <bi...@apache.org>
Authored: Wed Dec 21 17:44:53 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Wed Dec 21 17:44:53 2016 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/Kafka10DataLoader.java | 35 +++-
build/conf/kylin-kafka-consumer.xml | 27 ++++
.../localmeta/kylin-kafka-consumer.xml | 27 ++++
.../sandbox/kylin-kafka-consumer.xml | 27 ++++
pom.xml | 2 +-
.../apache/kylin/source/kafka/KafkaSource.java | 4 +-
.../source/kafka/config/KafkaClusterConfig.java | 4 -
.../kylin/source/kafka/config/KafkaConfig.java | 11 --
.../kafka/config/KafkaConsumerProperties.java | 159 +++++++++++++++++++
.../source/kafka/hadoop/KafkaFlatTableJob.java | 6 +-
.../source/kafka/hadoop/KafkaInputFormat.java | 5 +-
.../kafka/hadoop/KafkaInputRecordReader.java | 16 +-
.../kylin/source/kafka/util/KafkaClient.java | 34 +---
.../config/KafkaConsumerPropertiesTest.java | 50 ++++++
webapp/app/js/model/streamingModel.js | 2 -
.../partials/cubeDesigner/streamingConfig.html | 33 +---
16 files changed, 344 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
index 8c548be..fae81ce 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
@@ -19,6 +19,7 @@
package org.apache.kylin.job.streaming;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
@@ -26,6 +27,7 @@ import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -35,8 +37,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
-import org.apache.kylin.source.kafka.util.KafkaClient;
-
/**
* Load prepared data into kafka(for test use)
*/
@@ -60,10 +60,7 @@ public class Kafka10DataLoader extends StreamDataLoader {
}
}), ",");
- Properties props = new Properties();
- props.put("acks", "1");
- props.put("retry.backoff.ms", "1000");
- KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props);
+ KafkaProducer producer = getKafkaProducer(brokerList, null);
for (int i = 0; i < messages.size(); i++) {
ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
@@ -73,4 +70,30 @@ public class Kafka10DataLoader extends StreamDataLoader {
producer.close();
}
+ public static KafkaProducer getKafkaProducer(String brokers, Properties properties) {
+ Properties props = constructDefaultKafkaProducerProperties(brokers, properties);
+ KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
+ return producer;
+ }
+
+ private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties) {
+ Properties props = new Properties();
+ props.put("retry.backoff.ms", "1000");
+ props.put("bootstrap.servers", brokers);
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+ props.put("acks", "1");
+ props.put("buffer.memory", 33554432);
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 50);
+ props.put("request.timeout.ms", "30000");
+ if (properties != null) {
+ for (Map.Entry entry : properties.entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return props;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/build/conf/kylin-kafka-consumer.xml
----------------------------------------------------------------------
diff --git a/build/conf/kylin-kafka-consumer.xml b/build/conf/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..6da7004
--- /dev/null
+++ b/build/conf/kylin-kafka-consumer.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+ <property>
+ <name>session.timeout.ms</name>
+ <value>30000</value>
+ </property>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin-kafka-consumer.xml b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..6da7004
--- /dev/null
+++ b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+ <property>
+ <name>session.timeout.ms</name>
+ <value>30000</value>
+ </property>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin-kafka-consumer.xml b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
new file mode 100644
index 0000000..6da7004
--- /dev/null
+++ b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs
+-->
+<configuration>
+ <property>
+ <name>session.timeout.ms</name>
+ <value>30000</value>
+ </property>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 51479c8..c837b10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
<!-- HBase versions -->
<hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
- <kafka.version>0.10.0.0</kafka.version>
+ <kafka.version>0.10.1.0</kafka.version>
<!-- Hadoop deps, keep compatible with hadoop2.version -->
<zookeeper.version>3.4.6</zookeeper.version>
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index e469f77..12f3da2 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -83,8 +83,8 @@ public class KafkaSource implements ISource {
logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart());
result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
} else {
- // from the topic's very begining;
- logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's very beginning.");
+ // from the topic's earliest offset;
+ logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
index 95349c2..afe888f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
@@ -47,10 +47,6 @@ public class KafkaClusterConfig extends RootPersistentEntity {
@JsonBackReference
private KafkaConfig kafkaConfig;
- public int getBufferSize() {
- return kafkaConfig.getBufferSize();
- }
-
public String getTopic() {
return kafkaConfig.getTopic();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 157d83c..a096344 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -55,9 +55,6 @@ public class KafkaConfig extends RootPersistentEntity {
@JsonProperty("timeout")
private int timeout;
- @JsonProperty("bufferSize")
- private int bufferSize;
-
@JsonProperty("parserName")
private String parserName;
@@ -97,14 +94,6 @@ public class KafkaConfig extends RootPersistentEntity {
this.timeout = timeout;
}
- public int getBufferSize() {
- return bufferSize;
- }
-
- public void setBufferSize(int bufferSize) {
- this.bufferSize = bufferSize;
- }
-
public String getTopic() {
return topic;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
new file mode 100644
index 0000000..5bc1a82
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.kafka.config;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigCannotInitException;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConsumerProperties {
+
+ public static final String KAFKA_CONSUMER_FILE = "kylin-kafka-consumer.xml";
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProperties.class);
+ // static cached instances
+ private static KafkaConsumerProperties ENV_INSTANCE = null;
+ private volatile Properties properties = new Properties();
+
+ private KafkaConsumerProperties() {
+
+ }
+
+ public static KafkaConsumerProperties getInstanceFromEnv() {
+ synchronized (KafkaConsumerProperties.class) {
+ if (ENV_INSTANCE == null) {
+ try {
+ KafkaConsumerProperties config = new KafkaConsumerProperties();
+ config.properties = config.loadKafkaConsumerProperties();
+
+ logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config));
+ ENV_INSTANCE = config;
+ } catch (IllegalArgumentException e) {
+ throw new IllegalStateException("Failed to find KafkaConsumerProperties ", e);
+ }
+ }
+ return ENV_INSTANCE;
+ }
+ }
+
+ private static File getKafkaConsumerFile(String path) {
+ if (path == null) {
+ return null;
+ }
+
+ return new File(path, KAFKA_CONSUMER_FILE);
+ }
+
+ public static Properties getProperties(Configuration configuration) {
+ Set<String> configNames = new HashSet<String>();
+ try {
+ configNames = ConsumerConfig.configNames();
+ } catch (Exception e) {
+ // the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException
+ String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms,"
+ + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset").split(",");
+ configNames.addAll(Arrays.asList(configNamesArray));
+ }
+
+ Properties result = new Properties();
+ for (Iterator<Map.Entry<String, String>> it = configuration.iterator(); it.hasNext();) {
+ Map.Entry<String, String> entry = it.next();
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (configNames.contains(key)) {
+ result.put(key, value);
+ }
+ }
+ return result;
+ }
+
+ private Properties loadKafkaConsumerProperties() {
+ File propFile = getKafkaConsumerFile();
+ if (propFile == null || !propFile.exists()) {
+ logger.error("fail to locate " + KAFKA_CONSUMER_FILE);
+ throw new RuntimeException("fail to locate " + KAFKA_CONSUMER_FILE);
+ }
+ Properties properties = new Properties();
+ try {
+ FileInputStream is = new FileInputStream(propFile);
+ Configuration conf = new Configuration();
+ conf.addResource(is);
+ properties.putAll(getProperties(conf));
+ IOUtils.closeQuietly(is);
+
+ File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
+ if (propOverrideFile.exists()) {
+ FileInputStream ois = new FileInputStream(propOverrideFile);
+ Properties propOverride = new Properties();
+ Configuration oconf = new Configuration();
+ oconf.addResource(ois);
+ properties.putAll(getProperties(oconf));
+ IOUtils.closeQuietly(ois);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return properties;
+ }
+
+ public String getKafkaConsumerHadoopJobConf() {
+ File kafkaConsumerFile = getKafkaConsumerFile();
+ return OptionsHelper.convertToFileURL(kafkaConsumerFile.getAbsolutePath());
+ }
+
+ private File getKafkaConsumerFile() {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ String kylinConfHome = System.getProperty(KylinConfig.KYLIN_CONF);
+ if (!StringUtils.isEmpty(kylinConfHome)) {
+ logger.info("Use KYLIN_CONF=" + kylinConfHome);
+ return getKafkaConsumerFile(kylinConfHome);
+ }
+
+ logger.warn("KYLIN_CONF property was not set, will seek KYLIN_HOME env variable");
+
+ String kylinHome = kylinConfig.getKylinHome();
+ if (StringUtils.isEmpty(kylinHome))
+ throw new KylinConfigCannotInitException("Didn't find KYLIN_CONF or KYLIN_HOME, please set one of them");
+
+ String path = kylinHome + File.separator + "conf";
+ return getKafkaConsumerFile(path);
+ }
+
+ public Properties getProperties() {
+ Properties prop = new Properties();
+ prop.putAll(this.properties);
+ return prop;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index f4d54c5..40f12ee 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -19,6 +19,7 @@
package org.apache.kylin.source.kafka.hadoop;
import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
@@ -57,10 +58,10 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers";
public static final String CONFIG_KAFKA_TOPIC = "kafka.topic";
public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout";
- public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size";
public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group";
public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
+
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
@@ -100,10 +101,11 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
+ KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
+ job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
- job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize()));
job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index fe0e2cc..96bac3f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
@@ -33,6 +34,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.util.KafkaClient;
import com.google.common.base.Preconditions;
@@ -65,8 +67,9 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
}
}
+ Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
final List<InputSplit> splits = new ArrayList<InputSplit>();
- try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) {
+ try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
for (int i = 0; i < partitionInfos.size(); i++) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index 6774c9d..e8bd76c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka.hadoop;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
@@ -33,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +46,8 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
static Logger log = LoggerFactory.getLogger(KafkaInputRecordReader.class);
+ public static final long DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT = 60000;
+
private Configuration conf;
private KafkaInputSplit split;
@@ -61,8 +65,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
private LongWritable key;
private BytesWritable value;
- private long timeOut = 60000;
- private long bufferSize = 65536;
+ private long timeOut = DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT;
private long numProcessedMessages = 0L;
@@ -82,12 +85,11 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit
if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT) != null) {
timeOut = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT));
}
- if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE) != null) {
- bufferSize = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE));
- }
-
String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
- consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, null);
+
+ Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf);
+
+ consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties);
earliestOffset = this.split.getOffsetStart();
latestOffset = this.split.getOffsetEnd();
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 3b970b3..69d7440 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -20,9 +20,9 @@ package org.apache.kylin.source.kafka.util;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.source.kafka.KafkaConfigManager;
@@ -45,43 +45,17 @@ public class KafkaClient {
return consumer;
}
- public static KafkaProducer getKafkaProducer(String brokers, Properties properties) {
- Properties props = constructDefaultKafkaProducerProperties(brokers, properties);
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
- return producer;
- }
-
- private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties) {
+ private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
Properties props = new Properties();
- props.put("bootstrap.servers", brokers);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("acks", "1");
- props.put("buffer.memory", 33554432);
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("linger.ms", 50);
- props.put("request.timeout.ms", "30000");
if (properties != null) {
for (Map.Entry entry : properties.entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
}
- return props;
- }
-
- private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
- Properties props = new Properties();
props.put("bootstrap.servers", brokers);
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
props.put("group.id", consumerGroup);
- props.put("session.timeout.ms", "30000");
- if (properties != null) {
- for (Map.Entry entry : properties.entrySet()) {
- props.put(entry.getKey(), entry.getValue());
- }
- }
props.put("enable.auto.commit", "false");
return props;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
new file mode 100644
index 0000000..8edb84d
--- /dev/null
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -0,0 +1,50 @@
+package org.apache.kylin.source.kafka.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase {
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testLoadKafkaProperties() {
+ KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
+ assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks"));
+ assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms"));
+ assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms"));
+ }
+
+ @Test
+ public void testLoadKafkaPropertiesAsHadoopJobConf() throws IOException, ParserConfigurationException, SAXException {
+ KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
+ Configuration conf = new Configuration(false);
+ conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
+ assertEquals("30000", conf.get("session.timeout.ms"));
+
+ Properties prop = KafkaConsumerProperties.getProperties(conf);
+ assertEquals("30000", prop.getProperty("session.timeout.ms"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/webapp/app/js/model/streamingModel.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/streamingModel.js b/webapp/app/js/model/streamingModel.js
index 0321806..70f27aa 100644
--- a/webapp/app/js/model/streamingModel.js
+++ b/webapp/app/js/model/streamingModel.js
@@ -33,9 +33,7 @@ KylinApp.service('StreamingModel', function () {
"name": "",
"topic": "",
"timeout": "60000",
- "bufferSize": "65536",
"parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
- "margin": "300000",
"clusters":[],
"parserProperties":""
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/webapp/app/partials/cubeDesigner/streamingConfig.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/streamingConfig.html b/webapp/app/partials/cubeDesigner/streamingConfig.html
index 8bdcd25..5dc9788 100644
--- a/webapp/app/partials/cubeDesigner/streamingConfig.html
+++ b/webapp/app/partials/cubeDesigner/streamingConfig.html
@@ -238,37 +238,9 @@
</div>
</div>
- <div class="form-group middle-popover" ng-class="{'required':state.mode=='edit'}">
- <div class="row">
- <label class="col-xs-12 col-sm-3 control-label no-padding-right">
- <b>Buffer Size</b>
- <i class="fa fa-info-circle" kylinpopover placement="right" title="Buffer Size" template="BufferSizecTip.html"></i>
- </label>
-
- <div class="col-xs-12 col-sm-6"
- ng-class="{'has-error':form.cube_streaming_form.bufferSize.$invalid && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)}">
- <input ng-if="state.mode=='edit'" name="bufferSize" required ng-model="kafkaMeta.bufferSize" type="text"
- placeholder="Input kafkaConfig bufferSize"
- ng-pattern="/^\+?[1-9][0-9]*$/"
- class="form-control"/>
- <small class="help-block"
- ng-show="!form.cube_streaming_form.bufferSize.$error.required && form.cube_streaming_form.bufferSize.$invalid && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)">
- Kafka bufferSize is invalid.
- </small>
- <small class="help-block"
- ng-show="form.cube_streaming_form.bufferSize.$error.required && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)">
- Kafka bufferSize is required.
- </small>
- <span ng-if="state.mode=='view'">{{kafkaMeta.bufferSize}}</span>
- </div>
- </div>
- </div>
-
</accordion-group>
</accordion>
-
-
</div>
</form>
</div>
@@ -279,11 +251,8 @@
<script type="text/ng-template" id="TimeoutTip.html">
<p>Set timeout for kafka client.</p>
</script>
-<script type="text/ng-template" id="BufferSizecTip.html">
- <p>Set byte size for kafka client\u2019s buffer.</p>
- </script>
<script type="text/ng-template" id="MarginTip.html">
- <p>When the messages in kafka is not strictly sorted on timestamp, read more data (expressed in ts) before and after the specified interval to avoid data loss.</p>
+ <p>Deprecated. When the messages in kafka is not strictly sorted on timestamp, read more data (expressed in ts) before and after the specified interval to avoid data loss.</p>
</script>
<script type="text/ng-template" id="ParserName.html">
<p>Set the parser to parse source data messages. The default parser works for json messages with a timestamp field.</p>