You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:00:37 UTC

[02/50] [abbrv] incubator-kylin git commit: stream build

stream build


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3cc984f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3cc984f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3cc984f6

Branch: refs/heads/streaming-localdict
Commit: 3cc984f651aa2eeab12a9039e72569d999ddff4d
Parents: cb6d24b
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 10 16:56:20 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 10 16:56:20 2015 +0800

----------------------------------------------------------------------
 .../test_kylin_ii_inner_join_desc.json          |  4 +-
 .../test_kylin_ii_left_join_desc.json           |  4 +-
 .../invertedindex/index/TableRecordInfo.java    |  1 +
 .../kylin/invertedindex/model/IIDesc.java       |  2 +
 job/pom.xml                                     |  6 +++
 .../apache/kylin/streaming/StreamBuilder.java   | 41 ++++++++++++-------
 .../invertedindex/IIStreamBuilder.java          | 43 ++++++++++++--------
 7 files changed, 64 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_inner_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_inner_join_desc.json b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_inner_join_desc.json
index eb896a0..18958b6 100644
--- a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_inner_join_desc.json
+++ b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_inner_join_desc.json
@@ -65,6 +65,6 @@
     "PRICE",
     "ITEM_COUNT"
   ],
-  "sharding": 4,
-  "slice_size": 50000
+  "sharding": 1,
+  "slice_size": 1000
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_left_join_desc.json b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_left_join_desc.json
index 424b688..654cd38 100644
--- a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_left_join_desc.json
+++ b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_left_join_desc.json
@@ -65,6 +65,6 @@
     "PRICE",
     "ITEM_COUNT"
   ],
-  "sharding": 4,
-  "slice_size": 50000
+  "sharding": 1,
+  "slice_size": 1000
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index cc229da..68feb03 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -107,6 +107,7 @@ public class TableRecordInfo {
         for (int i = 0; i < nColumns; ++i) {
             final TblColRef tblColRef = getColumns().get(i);
             isMetric[i] = desc.isMetricsCol(i);
+            dataTypes[i] = tblColRef.getDatatype();
             if (isMetric[i]) {
                 lengths[i] = measureCodecMap.get(tblColRef).getLength();
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index e094dfc..142ac39 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -53,8 +53,10 @@ public class IIDesc extends RootPersistentEntity {
 
     public static final String HBASE_FAMILY = "f";
     public static final String HBASE_QUALIFIER = "c";
+    public static final String HBASE_DICTIONARY = "d";
     public static final byte[] HBASE_FAMILY_BYTES = Bytes.toBytes(HBASE_FAMILY);
     public static final byte[] HBASE_QUALIFIER_BYTES = Bytes.toBytes(HBASE_QUALIFIER);
+    public static final byte[] HBASE_DICTIONARY_BYTES = Bytes.toBytes(HBASE_DICTIONARY);
 
     private KylinConfig config;
     private DataModelDesc model;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index 1fff5af..45c47b3 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -58,6 +58,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-streaming</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 005c255..d86da90 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Created by qianzhou on 2/17/15.
@@ -48,27 +49,21 @@ public abstract class StreamBuilder implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
 
-    private static final int BATCH_BUILD_BYTES_THRESHOLD = 64 * 1024;
     private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 5 * 60 * 1000;
+    private final int sliceSize;
 
     private BlockingQueue<Stream> streamQueue;
     private long lastBuildTime = System.currentTimeMillis();
-    private int bytesTotal = 0;
 
-    public StreamBuilder(BlockingQueue<Stream> streamQueue) {
+    public StreamBuilder(BlockingQueue<Stream> streamQueue, int sliceSize) {
         this.streamQueue = streamQueue;
+        this.sliceSize = sliceSize;
     }
 
-    protected abstract void build(List<Stream> streamsToBuild);
-
-    private void buildStream(List<Stream> streams) {
-        build(streams);
-        clearCounter();
-    }
+    protected abstract boolean build(List<Stream> streamsToBuild);
 
     private void clearCounter() {
         lastBuildTime = System.currentTimeMillis();
-        bytesTotal = 0;
     }
 
     @Override
@@ -77,13 +72,29 @@ public abstract class StreamBuilder implements Runnable {
             List<Stream> streamToBuild = Lists.newArrayList();
             clearCounter();
             while (true) {
-                final Stream stream = streamQueue.take();
+                final Stream stream = streamQueue.poll(200, TimeUnit.MILLISECONDS);
+                if (stream == null) {
+                    continue;
+                } else {
+                    if (stream.getOffset() < 0) {
+                        if (!streamToBuild.isEmpty()) {
+                            build(streamToBuild);
+                        }
+                        logger.warn("streaming encountered EOF, stop building");
+                        break;
+                    }
+                }
                 streamToBuild.add(stream);
-                bytesTotal += stream.getRawData().length;
-                if (bytesTotal >= BATCH_BUILD_BYTES_THRESHOLD) {
-                    buildStream(streamToBuild);
+                if (streamToBuild.size() >= this.sliceSize) {
+                    if (build(streamToBuild)) {
+                        clearCounter();
+                        streamToBuild.clear();
+                    }
                 } else if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) {
-                    buildStream(streamToBuild);
+                    if (build(streamToBuild)) {
+                        clearCounter();
+                        streamToBuild.clear();
+                    }
                 } else {
                     continue;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index b2683b4..8b2673d 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -36,15 +36,14 @@ package org.apache.kylin.streaming.invertedindex;
 
 import com.google.common.base.Function;
 import com.google.common.collect.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.ReadableTable;
 import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.index.SliceBuilder;
 import org.apache.kylin.invertedindex.index.TableRecord;
@@ -54,7 +53,6 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.streaming.KafkaConfig;
 import org.apache.kylin.streaming.Stream;
 import org.apache.kylin.streaming.StreamBuilder;
 import org.slf4j.Logger;
@@ -65,7 +63,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 
 /**
  * Created by qianzhou on 3/3/15.
@@ -75,15 +73,23 @@ public class IIStreamBuilder extends StreamBuilder {
     private static Logger logger = LoggerFactory.getLogger(IIStreamBuilder.class);
 
     private IIDesc desc = null;
-    private KafkaConfig kafkaConfig = null;
     private HTableInterface hTable = null;
+    private int partitionId = -1;
 
-    public IIStreamBuilder(BlockingQueue<Stream> streamQueue) {
-        super(streamQueue);
+    public IIStreamBuilder(LinkedBlockingDeque<Stream> queue, String hTableName, IIDesc desc, int partitionId) {
+        super(queue, desc.getSliceSize());
+        this.desc = desc;
+        this.partitionId = partitionId;
+        try {
+            this.hTable = HConnectionManager.createConnection(HBaseConfiguration.create()).getTable(hTableName);
+        } catch (IOException e) {
+            logger.error("cannot open htable name:" + hTableName, e);
+            throw new RuntimeException("cannot open htable name:" + hTableName, e);
+        }
     }
 
     @Override
-    protected void build(List<Stream> streamsToBuild) {
+    protected boolean build(List<Stream> streamsToBuild) {
         List<List<String>> table = Lists.transform(streamsToBuild, new Function<Stream, List<String>>() {
             @Nullable
             @Override
@@ -101,14 +107,16 @@ public class IIStreamBuilder extends StreamBuilder {
             }
         }
         TableRecordInfo tableRecordInfo = new TableRecordInfo(desc, dictionaryMap, measureCodecMap);
-        SliceBuilder sliceBuilder = new SliceBuilder(tableRecordInfo, (short) kafkaConfig.getPartitionId());
+        SliceBuilder sliceBuilder = new SliceBuilder(tableRecordInfo, (short) partitionId);
         final Slice slice = buildSlice(table, sliceBuilder, tableRecordInfo);
-        try {
-            loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
-            submitOffset();
-        } catch (IOException e) {
-            logger.error("error load to hbase, build failed", e);
-        }
+//        try {
+//            loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
+//            submitOffset();
+//        } catch (IOException e) {
+//            logger.error("error load to hbase, build failed", e);
+//            return false;
+//        }
+        return true;
     }
 
     private Map<TblColRef, Dictionary<?>> buildDictionary(List<List<String>> table, IIDesc desc) {
@@ -139,8 +147,7 @@ public class IIStreamBuilder extends StreamBuilder {
     }
 
     private List<String> parseStream(Stream stream, IIDesc desc) {
-        List<String> result = Lists.newArrayList();
-        return result;
+        return Lists.newArrayList(new String(stream.getRawData()).split(","));
     }
 
     private Slice buildSlice(List<List<String>> table, SliceBuilder sliceBuilder, TableRecordInfo tableRecordInfo) {