You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/30 10:43:50 UTC

[09/26] kylin git commit: KYLIN-1726 update sampleProducer and kylin.sh

KYLIN-1726 update sampleProducer and kylin.sh

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7a793e5c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7a793e5c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7a793e5c

Branch: refs/heads/KYLIN-1971
Commit: 7a793e5c33f1f7bffd5a53d64ec92065abd5856a
Parents: c804dc8
Author: shaofengshi <sh...@apache.org>
Authored: Tue Oct 25 18:19:40 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 27 09:10:16 2016 +0800

----------------------------------------------------------------------
 build/bin/kylin.sh                              |  7 +++--
 .../source/kafka/util/KafkaSampleProducer.java  | 29 ++++++++++++--------
 2 files changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/7a793e5c/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 9286055e..ad3a952 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -32,7 +32,6 @@ function retrieveDependency() {
     #retrive $hive_dependency and $hbase_dependency
     source ${dir}/find-hive-dependency.sh
     source ${dir}/find-hbase-dependency.sh
-    #source ${dir}/find-kafka-dependency.sh
 
     #retrive $KYLIN_EXTRA_START_OPTS
     if [ -f "${dir}/setenv.sh" ]
@@ -41,7 +40,11 @@ function retrieveDependency() {
 
     export HBASE_CLASSPATH_PREFIX=${KYLIN_HOME}/conf:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/tool/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH_PREFIX}
     export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${hive_dependency}
-    #export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${kafka_dependency}
+    if [ -n "$KAFKA_HOME" ]
+    then
+        source ${dir}/find-kafka-dependency.sh
+        export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${kafka_dependency}
+    fi
 }
 
 # start command

http://git-wip-us.apache.org/repos/asf/kylin/blob/7a793e5c/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 3d26d3d..b8f98aa 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.UUID;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -48,7 +49,6 @@ public class KafkaSampleProducer {
     @SuppressWarnings("static-access")
     private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic");
     private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
-    private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay in mili-seconds, default 0").create("delay");
     private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
 
     private static final ObjectMapper mapper = new ObjectMapper();
@@ -60,21 +60,14 @@ public class KafkaSampleProducer {
         String topic, broker;
         options.addOption(OPTION_TOPIC);
         options.addOption(OPTION_BROKER);
-        options.addOption(OPTION_DELAY);
-        options.addOption(OPTION_INTERVAL);
         optionsHelper.parseOptions(options, args);
 
         logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
 
         topic = optionsHelper.getOptionValue(OPTION_TOPIC);
         broker = optionsHelper.getOptionValue(OPTION_BROKER);
-        long delay = 0;
-        String delayString = optionsHelper.getOptionValue(OPTION_DELAY);
-        if (delayString != null) {
-            delay = Long.parseLong(delayString);
-        }
 
-        long interval = 1000;
+        long interval = 10;
         String intervalString = optionsHelper.getOptionValue(OPTION_INTERVAL);
         if (intervalString != null) {
             interval = Long.parseLong(intervalString);
@@ -101,6 +94,10 @@ public class KafkaSampleProducer {
         devices.add("Andriod");
         devices.add("Other");
 
+        List<String> genders = new ArrayList();
+        genders.add("Male");
+        genders.add("Female");
+
         Properties props = new Properties();
         props.put("bootstrap.servers", broker);
         props.put("acks", "all");
@@ -117,15 +114,23 @@ public class KafkaSampleProducer {
         Random rnd = new Random();
         Map<String, Object> record = new HashMap();
         while (alive == true) {
-            record.put("order_time", (new Date().getTime() - delay));
+            //add normal record
+            record.put("order_time", (new Date().getTime()));
             record.put("country", countries.get(rnd.nextInt(countries.size())));
             record.put("category", category.get(rnd.nextInt(category.size())));
             record.put("device", devices.get(rnd.nextInt(devices.size())));
             record.put("qty", rnd.nextInt(10));
             record.put("currency", "USD");
             record.put("amount", rnd.nextDouble() * 100);
-            ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
-            System.out.println("Sending 1 message");
+            //add embedded record
+            Map<String, Object> user = new HashMap();
+            user.put("id", UUID.randomUUID().toString());
+            user.put("gender", genders.get(rnd.nextInt(2)));
+            user.put("age", rnd.nextInt(20) + 10);
+            record.put("user", user);
+            //send message
+            ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
+            System.out.println("Sending 1 message: " + record.toString());
             producer.send(data);
             Thread.sleep(interval);
         }