You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:18 UTC

[06/50] [abbrv] incubator-kylin git commit: KYLIN-749 add stream data generator

KYLIN-749 add stream data generator


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6ff8befc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6ff8befc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6ff8befc

Branch: refs/heads/streaming-localdict
Commit: 6ff8befcf8eb1a1897f6550651c837c5463a9598
Parents: 6533a33
Author: honma <ho...@ebay.com>
Authored: Tue May 5 14:38:31 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue May 5 14:38:31 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/TimeUtil.java  | 17 +++++
 .../apache/kylin/common/util/TimeUtilTest.java  | 44 +++++++++++
 .../kylin/job/streaming/StreamingBootstrap.java | 36 +++++----
 .../java/org/apache/kylin/job/DataGenTest.java  |  4 +
 .../job/dataGen/StreamingDataGenerator.java     | 78 ++++++++++++++++++++
 5 files changed, 160 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6ff8befc/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
new file mode 100644
index 0000000..01fc2c1
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
@@ -0,0 +1,17 @@
+package org.apache.kylin.common.util;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/4/15.
+ */
+public class TimeUtil {
+    private static int ONE_MINUTE_TS = 60 * 1000;
+    private static int ONE_HOUR_TS = 60 * 60 * 1000;
+
+    public static long getMinuteStart(long ts) {
+        return ts / ONE_MINUTE_TS * ONE_MINUTE_TS;
+    }
+
+    public static long getHourStart(long ts) {
+        return ts / ONE_HOUR_TS * ONE_HOUR_TS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6ff8befc/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
new file mode 100644
index 0000000..1442412
--- /dev/null
+++ b/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
@@ -0,0 +1,44 @@
+package org.apache.kylin.common.util;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/4/15.
+ */
+public class TimeUtilTest {
+    public static long normalizeTime(long timeMillis, NormalizeUnit unit) {
+        Calendar a = Calendar.getInstance();
+        Calendar b = Calendar.getInstance();
+        b.clear();
+
+        a.setTimeInMillis(timeMillis);
+        if (unit == NormalizeUnit.MINUTE) {
+            b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), a.get(Calendar.MINUTE));
+        } else if (unit == NormalizeUnit.HOUR) {
+            b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), 0);
+        }
+        return b.getTimeInMillis();
+    }
+
+    @Test
+    public void basicTest() throws ParseException {
+        java.text.DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+
+        long t1 = dateFormat.parse("2012/01/01 00:00:01").getTime();
+        Assert.assertEquals(normalizeTime(t1, NormalizeUnit.HOUR), TimeUtil.getHourStart(t1));
+        Assert.assertEquals(normalizeTime(t1, NormalizeUnit.MINUTE), TimeUtil.getMinuteStart(t1));
+
+        long t2 = dateFormat.parse("2012/12/31 11:02:01").getTime();
+        Assert.assertEquals(normalizeTime(t2, NormalizeUnit.HOUR), TimeUtil.getHourStart(t2));
+        Assert.assertEquals(normalizeTime(t2, NormalizeUnit.MINUTE), TimeUtil.getMinuteStart(t2));
+    }
+
+    public enum NormalizeUnit {
+        MINUTE, HOUR
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6ff8befc/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 3e8f3f4..1688fc2 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -34,27 +34,30 @@
 
 package org.apache.kylin.job.streaming;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
 import kafka.javaapi.PartitionMetadata;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
 import org.apache.kylin.streaming.*;
 import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Constructor;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 
 /**
  * Created by qianzhou on 3/26/15.
@@ -69,6 +72,12 @@ public class StreamingBootstrap {
 
     private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap();
 
+    private StreamingBootstrap(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+        this.streamingManager = StreamingManager.getInstance(kylinConfig);
+        this.iiManager = IIManager.getInstance(kylinConfig);
+    }
+
     public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
         final StreamingBootstrap bootstrap = new StreamingBootstrap(kylinConfig);
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@@ -80,12 +89,6 @@ public class StreamingBootstrap {
         return bootstrap;
     }
 
-    private StreamingBootstrap(KylinConfig kylinConfig) {
-        this.kylinConfig = kylinConfig;
-        this.streamingManager = StreamingManager.getInstance(kylinConfig);
-        this.iiManager = IIManager.getInstance(kylinConfig);
-    }
-
     private static Broker getLeadBroker(KafkaConfig kafkaConfig, int partitionId) {
         final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaConfig.getTopic(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
         if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
@@ -137,11 +140,7 @@ public class StreamingBootstrap {
         streamingOffset = Math.max(streamingOffset, earliestOffset);
         logger.info("starting offset is " + streamingOffset);
 
-        if (!HBaseConnection.tableExists(kylinConfig.getStorageUrl(), iiSegment.getStorageLocationIdentifier())) {
-            logger.error("no htable:" + iiSegment.getStorageLocationIdentifier() + " found");
-            throw new IllegalStateException("please create htable:" + iiSegment.getStorageLocationIdentifier() + " first");
-        }
-
+        IICreateHTableJob.main(new String[] { "-iiname", "nous_ii", "-htablename", "KYLIN_2SKJ8JNOUS" });
 
         KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
         kafkaConsumers.put(getKey(streaming, partitionId), consumer);
@@ -166,7 +165,6 @@ public class StreamingBootstrap {
             }
         }
 
-
     }
 
     private String getKey(String streaming, int partitionId) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6ff8befc/job/src/test/java/org/apache/kylin/job/DataGenTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DataGenTest.java b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
index 89e964e..c05ddc7 100644
--- a/job/src/test/java/org/apache/kylin/job/DataGenTest.java
+++ b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
@@ -54,4 +54,8 @@ public class DataGenTest extends LocalFileMetadataTestCase {
         DeployUtil.overrideFactTableData(content, "default.test_kylin_fact");
     }
 
+    @Test
+    public void testStreaming() throws Exception {
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6ff8befc/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
new file mode 100644
index 0000000..d822a9a
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
@@ -0,0 +1,78 @@
+package org.apache.kylin.job.dataGen;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/5/15.
+ */
+public class StreamingDataGenerator {
+    private static final Logger logger = LoggerFactory.getLogger(StreamingDataGenerator.class);
+    private static Random random = new Random();
+    private static String[] decimalFormat = new String[] { "%.4f", "%.5f", "%.6f" };
+
+    public static Iterator<List<String>> generate(final long start, final long end, final int count) {
+        final KylinConfig config = KylinConfig.getInstanceFromEnv();
+        final IIInstance ii = IIManager.getInstance(config).getII("test_streaming_table");
+        final IIDesc iiDesc = ii.getDescriptor();
+        final MetadataManager metadataManager = MetadataManager.getInstance(config);
+        final ColumnDesc[] columnDescs = metadataManager.getTableDesc(iiDesc.getFactTableName()).getColumns();
+
+        return new Iterator<List<String>>() {
+            private Map<String, String> values = Maps.newHashMap();
+
+            @Override
+            public boolean hasNext() {
+                return false;
+            }
+
+            @Override
+            public List<String> next() {
+                values.clear();
+                long ts = this.createTs(start, end);
+                values.put("ts", Long.toString(ts));
+                values.put("minute_start", Long.toString(TimeUtil.getMinuteStart(ts)));
+                values.put("hour_start", Long.toString(TimeUtil.getHourStart(ts)));
+                values.put("itm", Integer.toString(random.nextInt(20)));
+                values.put("site", Integer.toString(random.nextInt(5)));
+
+                values.put("gmv", String.format(decimalFormat[random.nextInt(3)], random.nextFloat() * 100));
+                values.put("item_count", Integer.toString(random.nextInt(5)));
+
+                if (values.size() != columnDescs.length) {
+                    throw new RuntimeException("the structure of streaming table has changed, need to modify generator too");
+                }
+
+                List<String> ret = Lists.newArrayList();
+                for (ColumnDesc columnDesc : columnDescs) {
+                    String name = columnDesc.getName();
+                    ret.add(values.get(name));
+                }
+                return ret;
+            }
+
+            @Override
+            public void remove() {
+            }
+
+            private long createTs(final long start, final long end) {
+                return start + (long) (random.nextDouble() * (end - start));
+            }
+        };
+    }
+
+}