You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/19 09:16:58 UTC

kylin git commit: Add KafkaSampleProducer for streaming live demo

Repository: kylin
Updated Branches:
  refs/heads/2.0-rc 0b2652282 -> 147986be0


Add KafkaSampleProducer for streaming live demo


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/147986be
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/147986be
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/147986be

Branch: refs/heads/2.0-rc
Commit: 147986be09750b4e0f11a2d3f3e652077322a425
Parents: 0b26522
Author: shaofengshi <sh...@apache.org>
Authored: Fri Feb 19 16:01:36 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Feb 19 16:15:04 2016 +0800

----------------------------------------------------------------------
 build/bin/streaming_build.sh                    |  3 +-
 .../source/kafka/util/KafkaSampleProducer.java  | 99 ++++++++++++++++++++
 2 files changed, 100 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/147986be/build/bin/streaming_build.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh
index cb86e29..a96ecc1 100644
--- a/build/bin/streaming_build.sh
+++ b/build/bin/streaming_build.sh
@@ -23,7 +23,6 @@ source ~/.bash_profile
 STREAMING=$1
 INTERVAL=$2
 DELAY=$3
-MARGIN=$4
 CURRENT_TIME_IN_SECOND=`date +%s`
 CURRENT_TIME=$((CURRENT_TIME_IN_SECOND * 1000))
 START=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY))
@@ -31,4 +30,4 @@ END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL))
 
 ID="$START"_"$END"
 echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -oneoff true -start ${START} -end ${END} -streaming ${STREAMING} -margin ${MARGIN}
\ No newline at end of file
+sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -start ${START} -end ${END} -streaming ${STREAMING}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/147986be/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
new file mode 100644
index 0000000..1846157
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -0,0 +1,99 @@
+package org.apache.kylin.source.kafka.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * A sample producer which will create sample data to kafka topic
+ */
+public class KafkaSampleProducer {
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaSampleProducer.class);
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic");
+    private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
+    private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay").create("delay");
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    public static void main(String[] args) throws Exception {
+        logger.info("args: " + Arrays.toString(args));
+        OptionsHelper optionsHelper = new OptionsHelper();
+        Options options = new Options();
+        String topic, broker;
+        options.addOption(OPTION_TOPIC);
+        options.addOption(OPTION_BROKER);
+        options.addOption(OPTION_DELAY);
+        optionsHelper.parseOptions(options, args);
+
+        logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
+
+        topic = optionsHelper.getOptionValue(OPTION_TOPIC);
+        broker = optionsHelper.getOptionValue(OPTION_BROKER);
+        long delay = 0;
+        String delayString = optionsHelper.getOptionValue(OPTION_DELAY);
+        if (delayString != null) {
+            delay = Long.parseLong(optionsHelper.getOptionValue(OPTION_DELAY));
+        }
+
+        List<String> countries = new ArrayList();
+        countries.add("AUSTRALIA");
+        countries.add("CANADA");
+        countries.add("CHINA");
+        countries.add("INDIA");
+        countries.add("JAPAN");
+        countries.add("KOREA");
+        countries.add("US");
+        countries.add("Other");
+        List<String> category = new ArrayList();
+        category.add("BOOK");
+        category.add("TOY");
+        category.add("CLOTH");
+        category.add("ELECTRONIC");
+        category.add("Other");
+        List<String> devices = new ArrayList();
+        devices.add("iOS");
+        devices.add("Windows");
+        devices.add("Andriod");
+        devices.add("Other");
+
+        Properties props = new Properties();
+        props.put("metadata.broker.list", broker);
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("request.required.acks", "1");
+
+        ProducerConfig config = new ProducerConfig(props);
+
+        Producer<String, String> producer = new Producer<String, String>(config);
+
+        boolean alive = true;
+        Random rnd = new Random();
+        Map<String, Object> record = new HashMap();
+        while (alive == true) {
+            record.put("order_time", (new Date().getTime() - delay)); 
+            record.put("country", countries.get(rnd.nextInt(countries.size())));
+            record.put("category", category.get(rnd.nextInt(category.size())));
+            record.put("device", devices.get(rnd.nextInt(devices.size())));
+            record.put("qty", rnd.nextInt(10));
+            record.put("currency", "USD");
+            record.put("amount", rnd.nextDouble() * 100);
+            KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
+            System.out.println("Sending 1 message");
+            producer.send(data);
+            Thread.sleep(2000);
+        }
+        producer.close();
+    }
+
+
+}