You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:18 UTC
[06/50] [abbrv] incubator-kylin git commit: KYLIN-749 add stream data
generator
KYLIN-749 add stream data generator
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6ff8befc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6ff8befc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6ff8befc
Branch: refs/heads/streaming-localdict
Commit: 6ff8befcf8eb1a1897f6550651c837c5463a9598
Parents: 6533a33
Author: honma <ho...@ebay.com>
Authored: Tue May 5 14:38:31 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue May 5 14:38:31 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/TimeUtil.java | 17 +++++
.../apache/kylin/common/util/TimeUtilTest.java | 44 +++++++++++
.../kylin/job/streaming/StreamingBootstrap.java | 36 +++++----
.../java/org/apache/kylin/job/DataGenTest.java | 4 +
.../job/dataGen/StreamingDataGenerator.java | 78 ++++++++++++++++++++
5 files changed, 160 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6ff8befc/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
new file mode 100644
index 0000000..01fc2c1
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
@@ -0,0 +1,17 @@
+package org.apache.kylin.common.util;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/4/15.
+ */
+public class TimeUtil {
+ private static int ONE_MINUTE_TS = 60 * 1000;
+ private static int ONE_HOUR_TS = 60 * 60 * 1000;
+
+ public static long getMinuteStart(long ts) {
+ return ts / ONE_MINUTE_TS * ONE_MINUTE_TS;
+ }
+
+ public static long getHourStart(long ts) {
+ return ts / ONE_HOUR_TS * ONE_HOUR_TS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6ff8befc/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
new file mode 100644
index 0000000..1442412
--- /dev/null
+++ b/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
@@ -0,0 +1,44 @@
+package org.apache.kylin.common.util;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/4/15.
+ */
+public class TimeUtilTest {
+ public static long normalizeTime(long timeMillis, NormalizeUnit unit) {
+ Calendar a = Calendar.getInstance();
+ Calendar b = Calendar.getInstance();
+ b.clear();
+
+ a.setTimeInMillis(timeMillis);
+ if (unit == NormalizeUnit.MINUTE) {
+ b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), a.get(Calendar.MINUTE));
+ } else if (unit == NormalizeUnit.HOUR) {
+ b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), 0);
+ }
+ return b.getTimeInMillis();
+ }
+
+ @Test
+ public void basicTest() throws ParseException {
+ java.text.DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+
+ long t1 = dateFormat.parse("2012/01/01 00:00:01").getTime();
+ Assert.assertEquals(normalizeTime(t1, NormalizeUnit.HOUR), TimeUtil.getHourStart(t1));
+ Assert.assertEquals(normalizeTime(t1, NormalizeUnit.MINUTE), TimeUtil.getMinuteStart(t1));
+
+ long t2 = dateFormat.parse("2012/12/31 11:02:01").getTime();
+ Assert.assertEquals(normalizeTime(t2, NormalizeUnit.HOUR), TimeUtil.getHourStart(t2));
+ Assert.assertEquals(normalizeTime(t2, NormalizeUnit.MINUTE), TimeUtil.getMinuteStart(t2));
+ }
+
+ public enum NormalizeUnit {
+ MINUTE, HOUR
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6ff8befc/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 3e8f3f4..1688fc2 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -34,27 +34,30 @@
package org.apache.kylin.job.streaming;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.javaapi.PartitionMetadata;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
import org.apache.kylin.streaming.*;
import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Constructor;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
/**
* Created by qianzhou on 3/26/15.
@@ -69,6 +72,12 @@ public class StreamingBootstrap {
private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap();
+ private StreamingBootstrap(KylinConfig kylinConfig) {
+ this.kylinConfig = kylinConfig;
+ this.streamingManager = StreamingManager.getInstance(kylinConfig);
+ this.iiManager = IIManager.getInstance(kylinConfig);
+ }
+
public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
final StreamingBootstrap bootstrap = new StreamingBootstrap(kylinConfig);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@@ -80,12 +89,6 @@ public class StreamingBootstrap {
return bootstrap;
}
- private StreamingBootstrap(KylinConfig kylinConfig) {
- this.kylinConfig = kylinConfig;
- this.streamingManager = StreamingManager.getInstance(kylinConfig);
- this.iiManager = IIManager.getInstance(kylinConfig);
- }
-
private static Broker getLeadBroker(KafkaConfig kafkaConfig, int partitionId) {
final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaConfig.getTopic(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
@@ -137,11 +140,7 @@ public class StreamingBootstrap {
streamingOffset = Math.max(streamingOffset, earliestOffset);
logger.info("starting offset is " + streamingOffset);
- if (!HBaseConnection.tableExists(kylinConfig.getStorageUrl(), iiSegment.getStorageLocationIdentifier())) {
- logger.error("no htable:" + iiSegment.getStorageLocationIdentifier() + " found");
- throw new IllegalStateException("please create htable:" + iiSegment.getStorageLocationIdentifier() + " first");
- }
-
+ IICreateHTableJob.main(new String[] { "-iiname", "nous_ii", "-htablename", "KYLIN_2SKJ8JNOUS" });
KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
kafkaConsumers.put(getKey(streaming, partitionId), consumer);
@@ -166,7 +165,6 @@ public class StreamingBootstrap {
}
}
-
}
private String getKey(String streaming, int partitionId) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6ff8befc/job/src/test/java/org/apache/kylin/job/DataGenTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DataGenTest.java b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
index 89e964e..c05ddc7 100644
--- a/job/src/test/java/org/apache/kylin/job/DataGenTest.java
+++ b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
@@ -54,4 +54,8 @@ public class DataGenTest extends LocalFileMetadataTestCase {
DeployUtil.overrideFactTableData(content, "default.test_kylin_fact");
}
+ @Test
+ public void testStreaming() throws Exception {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6ff8befc/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
new file mode 100644
index 0000000..d822a9a
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
@@ -0,0 +1,78 @@
+package org.apache.kylin.job.dataGen;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/5/15.
+ */
+public class StreamingDataGenerator {
+ private static final Logger logger = LoggerFactory.getLogger(StreamingDataGenerator.class);
+ private static Random random = new Random();
+ private static String[] decimalFormat = new String[] { "%.4f", "%.5f", "%.6f" };
+
+ public static Iterator<List<String>> generate(final long start, final long end, final int count) {
+ final KylinConfig config = KylinConfig.getInstanceFromEnv();
+ final IIInstance ii = IIManager.getInstance(config).getII("test_streaming_table");
+ final IIDesc iiDesc = ii.getDescriptor();
+ final MetadataManager metadataManager = MetadataManager.getInstance(config);
+ final ColumnDesc[] columnDescs = metadataManager.getTableDesc(iiDesc.getFactTableName()).getColumns();
+
+ return new Iterator<List<String>>() {
+ private Map<String, String> values = Maps.newHashMap();
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public List<String> next() {
+ values.clear();
+ long ts = this.createTs(start, end);
+ values.put("ts", Long.toString(ts));
+ values.put("minute_start", Long.toString(TimeUtil.getMinuteStart(ts)));
+ values.put("hour_start", Long.toString(TimeUtil.getHourStart(ts)));
+ values.put("itm", Integer.toString(random.nextInt(20)));
+ values.put("site", Integer.toString(random.nextInt(5)));
+
+ values.put("gmv", String.format(decimalFormat[random.nextInt(3)], random.nextFloat() * 100));
+ values.put("item_count", Integer.toString(random.nextInt(5)));
+
+ if (values.size() != columnDescs.length) {
+ throw new RuntimeException("the structure of streaming table has changed, need to modify generator too");
+ }
+
+ List<String> ret = Lists.newArrayList();
+ for (ColumnDesc columnDesc : columnDescs) {
+ String name = columnDesc.getName();
+ ret.add(values.get(name));
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ }
+
+ private long createTs(final long start, final long end) {
+ return start + (long) (random.nextDouble() * (end - start));
+ }
+ };
+ }
+
+}