You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:00:37 UTC
[02/50] [abbrv] incubator-kylin git commit: stream build
stream build
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3cc984f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3cc984f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3cc984f6
Branch: refs/heads/streaming-localdict
Commit: 3cc984f651aa2eeab12a9039e72569d999ddff4d
Parents: cb6d24b
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 10 16:56:20 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 10 16:56:20 2015 +0800
----------------------------------------------------------------------
.../test_kylin_ii_inner_join_desc.json | 4 +-
.../test_kylin_ii_left_join_desc.json | 4 +-
.../invertedindex/index/TableRecordInfo.java | 1 +
.../kylin/invertedindex/model/IIDesc.java | 2 +
job/pom.xml | 6 +++
.../apache/kylin/streaming/StreamBuilder.java | 41 ++++++++++++-------
.../invertedindex/IIStreamBuilder.java | 43 ++++++++++++--------
7 files changed, 64 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_inner_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_inner_join_desc.json b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_inner_join_desc.json
index eb896a0..18958b6 100644
--- a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_inner_join_desc.json
+++ b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_inner_join_desc.json
@@ -65,6 +65,6 @@
"PRICE",
"ITEM_COUNT"
],
- "sharding": 4,
- "slice_size": 50000
+ "sharding": 1,
+ "slice_size": 1000
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_left_join_desc.json b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_left_join_desc.json
index 424b688..654cd38 100644
--- a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_left_join_desc.json
+++ b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_left_join_desc.json
@@ -65,6 +65,6 @@
"PRICE",
"ITEM_COUNT"
],
- "sharding": 4,
- "slice_size": 50000
+ "sharding": 1,
+ "slice_size": 1000
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index cc229da..68feb03 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -107,6 +107,7 @@ public class TableRecordInfo {
for (int i = 0; i < nColumns; ++i) {
final TblColRef tblColRef = getColumns().get(i);
isMetric[i] = desc.isMetricsCol(i);
+ dataTypes[i] = tblColRef.getDatatype();
if (isMetric[i]) {
lengths[i] = measureCodecMap.get(tblColRef).getLength();
} else {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index e094dfc..142ac39 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -53,8 +53,10 @@ public class IIDesc extends RootPersistentEntity {
public static final String HBASE_FAMILY = "f";
public static final String HBASE_QUALIFIER = "c";
+ public static final String HBASE_DICTIONARY = "d";
public static final byte[] HBASE_FAMILY_BYTES = Bytes.toBytes(HBASE_FAMILY);
public static final byte[] HBASE_QUALIFIER_BYTES = Bytes.toBytes(HBASE_QUALIFIER);
+ public static final byte[] HBASE_DICTIONARY_BYTES = Bytes.toBytes(HBASE_DICTIONARY);
private KylinConfig config;
private DataModelDesc model;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index 1fff5af..45c47b3 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -58,6 +58,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-streaming</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 005c255..d86da90 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
* Created by qianzhou on 2/17/15.
@@ -48,27 +49,21 @@ public abstract class StreamBuilder implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
- private static final int BATCH_BUILD_BYTES_THRESHOLD = 64 * 1024;
private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 5 * 60 * 1000;
+ private final int sliceSize;
private BlockingQueue<Stream> streamQueue;
private long lastBuildTime = System.currentTimeMillis();
- private int bytesTotal = 0;
- public StreamBuilder(BlockingQueue<Stream> streamQueue) {
+ public StreamBuilder(BlockingQueue<Stream> streamQueue, int sliceSize) {
this.streamQueue = streamQueue;
+ this.sliceSize = sliceSize;
}
- protected abstract void build(List<Stream> streamsToBuild);
-
- private void buildStream(List<Stream> streams) {
- build(streams);
- clearCounter();
- }
+ protected abstract boolean build(List<Stream> streamsToBuild);
private void clearCounter() {
lastBuildTime = System.currentTimeMillis();
- bytesTotal = 0;
}
@Override
@@ -77,13 +72,29 @@ public abstract class StreamBuilder implements Runnable {
List<Stream> streamToBuild = Lists.newArrayList();
clearCounter();
while (true) {
- final Stream stream = streamQueue.take();
+ final Stream stream = streamQueue.poll(200, TimeUnit.MILLISECONDS);
+ if (stream == null) {
+ continue;
+ } else {
+ if (stream.getOffset() < 0) {
+ if (!streamToBuild.isEmpty()) {
+ build(streamToBuild);
+ }
+ logger.warn("streaming encountered EOF, stop building");
+ break;
+ }
+ }
streamToBuild.add(stream);
- bytesTotal += stream.getRawData().length;
- if (bytesTotal >= BATCH_BUILD_BYTES_THRESHOLD) {
- buildStream(streamToBuild);
+ if (streamToBuild.size() >= this.sliceSize) {
+ if (build(streamToBuild)) {
+ clearCounter();
+ streamToBuild.clear();
+ }
} else if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) {
- buildStream(streamToBuild);
+ if (build(streamToBuild)) {
+ clearCounter();
+ streamToBuild.clear();
+ }
} else {
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3cc984f6/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index b2683b4..8b2673d 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -36,15 +36,14 @@ package org.apache.kylin.streaming.invertedindex;
import com.google.common.base.Function;
import com.google.common.collect.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.ReadableTable;
import org.apache.kylin.invertedindex.index.Slice;
import org.apache.kylin.invertedindex.index.SliceBuilder;
import org.apache.kylin.invertedindex.index.TableRecord;
@@ -54,7 +53,6 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.streaming.KafkaConfig;
import org.apache.kylin.streaming.Stream;
import org.apache.kylin.streaming.StreamBuilder;
import org.slf4j.Logger;
@@ -65,7 +63,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
/**
* Created by qianzhou on 3/3/15.
@@ -75,15 +73,23 @@ public class IIStreamBuilder extends StreamBuilder {
private static Logger logger = LoggerFactory.getLogger(IIStreamBuilder.class);
private IIDesc desc = null;
- private KafkaConfig kafkaConfig = null;
private HTableInterface hTable = null;
+ private int partitionId = -1;
- public IIStreamBuilder(BlockingQueue<Stream> streamQueue) {
- super(streamQueue);
+ public IIStreamBuilder(LinkedBlockingDeque<Stream> queue, String hTableName, IIDesc desc, int partitionId) {
+ super(queue, desc.getSliceSize());
+ this.desc = desc;
+ this.partitionId = partitionId;
+ try {
+ this.hTable = HConnectionManager.createConnection(HBaseConfiguration.create()).getTable(hTableName);
+ } catch (IOException e) {
+ logger.error("cannot open htable name:" + hTableName, e);
+ throw new RuntimeException("cannot open htable name:" + hTableName, e);
+ }
}
@Override
- protected void build(List<Stream> streamsToBuild) {
+ protected boolean build(List<Stream> streamsToBuild) {
List<List<String>> table = Lists.transform(streamsToBuild, new Function<Stream, List<String>>() {
@Nullable
@Override
@@ -101,14 +107,16 @@ public class IIStreamBuilder extends StreamBuilder {
}
}
TableRecordInfo tableRecordInfo = new TableRecordInfo(desc, dictionaryMap, measureCodecMap);
- SliceBuilder sliceBuilder = new SliceBuilder(tableRecordInfo, (short) kafkaConfig.getPartitionId());
+ SliceBuilder sliceBuilder = new SliceBuilder(tableRecordInfo, (short) partitionId);
final Slice slice = buildSlice(table, sliceBuilder, tableRecordInfo);
- try {
- loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
- submitOffset();
- } catch (IOException e) {
- logger.error("error load to hbase, build failed", e);
- }
+// try {
+// loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
+// submitOffset();
+// } catch (IOException e) {
+// logger.error("error load to hbase, build failed", e);
+// return false;
+// }
+ return true;
}
private Map<TblColRef, Dictionary<?>> buildDictionary(List<List<String>> table, IIDesc desc) {
@@ -139,8 +147,7 @@ public class IIStreamBuilder extends StreamBuilder {
}
private List<String> parseStream(Stream stream, IIDesc desc) {
- List<String> result = Lists.newArrayList();
- return result;
+ return Lists.newArrayList(new String(stream.getRawData()).split(","));
}
private Slice buildSlice(List<List<String>> table, SliceBuilder sliceBuilder, TableRecordInfo tableRecordInfo) {