You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/19 09:16:58 UTC
kylin git commit: Add KafkaSampleProducer for streaming live demo
Repository: kylin
Updated Branches:
refs/heads/2.0-rc 0b2652282 -> 147986be0
Add KafkaSampleProducer for streaming live demo
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/147986be
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/147986be
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/147986be
Branch: refs/heads/2.0-rc
Commit: 147986be09750b4e0f11a2d3f3e652077322a425
Parents: 0b26522
Author: shaofengshi <sh...@apache.org>
Authored: Fri Feb 19 16:01:36 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Feb 19 16:15:04 2016 +0800
----------------------------------------------------------------------
build/bin/streaming_build.sh | 3 +-
.../source/kafka/util/KafkaSampleProducer.java | 99 ++++++++++++++++++++
2 files changed, 100 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/147986be/build/bin/streaming_build.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh
index cb86e29..a96ecc1 100644
--- a/build/bin/streaming_build.sh
+++ b/build/bin/streaming_build.sh
@@ -23,7 +23,6 @@ source ~/.bash_profile
STREAMING=$1
INTERVAL=$2
DELAY=$3
-MARGIN=$4
CURRENT_TIME_IN_SECOND=`date +%s`
CURRENT_TIME=$((CURRENT_TIME_IN_SECOND * 1000))
START=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY))
@@ -31,4 +30,4 @@ END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL))
ID="$START"_"$END"
echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -oneoff true -start ${START} -end ${END} -streaming ${STREAMING} -margin ${MARGIN}
\ No newline at end of file
+sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -start ${START} -end ${END} -streaming ${STREAMING}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/147986be/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
new file mode 100644
index 0000000..1846157
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -0,0 +1,99 @@
+package org.apache.kylin.source.kafka.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * A sample producer which will create sample data to kafka topic
+ */
+public class KafkaSampleProducer {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaSampleProducer.class);
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic");
+ private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
+ private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay").create("delay");
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+ logger.info("args: " + Arrays.toString(args));
+ OptionsHelper optionsHelper = new OptionsHelper();
+ Options options = new Options();
+ String topic, broker;
+ options.addOption(OPTION_TOPIC);
+ options.addOption(OPTION_BROKER);
+ options.addOption(OPTION_DELAY);
+ optionsHelper.parseOptions(options, args);
+
+ logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
+
+ topic = optionsHelper.getOptionValue(OPTION_TOPIC);
+ broker = optionsHelper.getOptionValue(OPTION_BROKER);
+ long delay = 0;
+ String delayString = optionsHelper.getOptionValue(OPTION_DELAY);
+ if (delayString != null) {
+ delay = Long.parseLong(optionsHelper.getOptionValue(OPTION_DELAY));
+ }
+
+ List<String> countries = new ArrayList();
+ countries.add("AUSTRALIA");
+ countries.add("CANADA");
+ countries.add("CHINA");
+ countries.add("INDIA");
+ countries.add("JAPAN");
+ countries.add("KOREA");
+ countries.add("US");
+ countries.add("Other");
+ List<String> category = new ArrayList();
+ category.add("BOOK");
+ category.add("TOY");
+ category.add("CLOTH");
+ category.add("ELECTRONIC");
+ category.add("Other");
+ List<String> devices = new ArrayList();
+ devices.add("iOS");
+ devices.add("Windows");
+ devices.add("Andriod");
+ devices.add("Other");
+
+ Properties props = new Properties();
+ props.put("metadata.broker.list", broker);
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("request.required.acks", "1");
+
+ ProducerConfig config = new ProducerConfig(props);
+
+ Producer<String, String> producer = new Producer<String, String>(config);
+
+ boolean alive = true;
+ Random rnd = new Random();
+ Map<String, Object> record = new HashMap();
+ while (alive == true) {
+ record.put("order_time", (new Date().getTime() - delay));
+ record.put("country", countries.get(rnd.nextInt(countries.size())));
+ record.put("category", category.get(rnd.nextInt(category.size())));
+ record.put("device", devices.get(rnd.nextInt(devices.size())));
+ record.put("qty", rnd.nextInt(10));
+ record.put("currency", "USD");
+ record.put("amount", rnd.nextDouble() * 100);
+ KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
+ System.out.println("Sending 1 message");
+ producer.send(data);
+ Thread.sleep(2000);
+ }
+ producer.close();
+ }
+
+
+}