You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/30 10:43:50 UTC
[09/26] kylin git commit: KYLIN-1726 update sampleProducer and
kylin.sh
KYLIN-1726 update sampleProducer and kylin.sh
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7a793e5c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7a793e5c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7a793e5c
Branch: refs/heads/KYLIN-1971
Commit: 7a793e5c33f1f7bffd5a53d64ec92065abd5856a
Parents: c804dc8
Author: shaofengshi <sh...@apache.org>
Authored: Tue Oct 25 18:19:40 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 27 09:10:16 2016 +0800
----------------------------------------------------------------------
build/bin/kylin.sh | 7 +++--
.../source/kafka/util/KafkaSampleProducer.java | 29 ++++++++++++--------
2 files changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/7a793e5c/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 9286055e..ad3a952 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -32,7 +32,6 @@ function retrieveDependency() {
#retrive $hive_dependency and $hbase_dependency
source ${dir}/find-hive-dependency.sh
source ${dir}/find-hbase-dependency.sh
- #source ${dir}/find-kafka-dependency.sh
#retrive $KYLIN_EXTRA_START_OPTS
if [ -f "${dir}/setenv.sh" ]
@@ -41,7 +40,11 @@ function retrieveDependency() {
export HBASE_CLASSPATH_PREFIX=${KYLIN_HOME}/conf:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/tool/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH_PREFIX}
export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${hive_dependency}
- #export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${kafka_dependency}
+ if [ -n "$KAFKA_HOME" ]
+ then
+ source ${dir}/find-kafka-dependency.sh
+ export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${kafka_dependency}
+ fi
}
# start command
http://git-wip-us.apache.org/repos/asf/kylin/blob/7a793e5c/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 3d26d3d..b8f98aa 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.UUID;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
@@ -48,7 +49,6 @@ public class KafkaSampleProducer {
@SuppressWarnings("static-access")
private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic");
private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
- private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay in mili-seconds, default 0").create("delay");
private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
private static final ObjectMapper mapper = new ObjectMapper();
@@ -60,21 +60,14 @@ public class KafkaSampleProducer {
String topic, broker;
options.addOption(OPTION_TOPIC);
options.addOption(OPTION_BROKER);
- options.addOption(OPTION_DELAY);
- options.addOption(OPTION_INTERVAL);
optionsHelper.parseOptions(options, args);
logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
topic = optionsHelper.getOptionValue(OPTION_TOPIC);
broker = optionsHelper.getOptionValue(OPTION_BROKER);
- long delay = 0;
- String delayString = optionsHelper.getOptionValue(OPTION_DELAY);
- if (delayString != null) {
- delay = Long.parseLong(delayString);
- }
- long interval = 1000;
+ long interval = 10;
String intervalString = optionsHelper.getOptionValue(OPTION_INTERVAL);
if (intervalString != null) {
interval = Long.parseLong(intervalString);
@@ -101,6 +94,10 @@ public class KafkaSampleProducer {
devices.add("Andriod");
devices.add("Other");
+ List<String> genders = new ArrayList();
+ genders.add("Male");
+ genders.add("Female");
+
Properties props = new Properties();
props.put("bootstrap.servers", broker);
props.put("acks", "all");
@@ -117,15 +114,23 @@ public class KafkaSampleProducer {
Random rnd = new Random();
Map<String, Object> record = new HashMap();
while (alive == true) {
- record.put("order_time", (new Date().getTime() - delay));
+ //add normal record
+ record.put("order_time", (new Date().getTime()));
record.put("country", countries.get(rnd.nextInt(countries.size())));
record.put("category", category.get(rnd.nextInt(category.size())));
record.put("device", devices.get(rnd.nextInt(devices.size())));
record.put("qty", rnd.nextInt(10));
record.put("currency", "USD");
record.put("amount", rnd.nextDouble() * 100);
- ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
- System.out.println("Sending 1 message");
+ //add embedded record
+ Map<String, Object> user = new HashMap();
+ user.put("id", UUID.randomUUID().toString());
+ user.put("gender", genders.get(rnd.nextInt(2)));
+ user.put("age", rnd.nextInt(20) + 10);
+ record.put("user", user);
+ //send message
+ ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
+ System.out.println("Sending 1 message: " + record.toString());
producer.send(data);
Thread.sleep(interval);
}