You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:00:56 UTC
[21/50] [abbrv] incubator-kylin git commit: stream building
stream building
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/66902d01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/66902d01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/66902d01
Branch: refs/heads/streaming-localdict
Commit: 66902d017785ef5395405850fbd858ca612b722d
Parents: 2be4e32
Author: qianhao.zhou <qi...@ebay.com>
Authored: Wed Mar 11 15:40:59 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Mar 11 15:40:59 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/invertedindex/index/Slice.java | 8 +++--
.../invertedindex/index/TableRecordInfo.java | 17 ---------
.../invertedindex/model/IIKeyValueCodec.java | 15 ++++----
.../kylin/invertedindex/model/KeyValuePair.java | 23 +++++++++----
.../endpoint/HbaseServerKVIterator.java | 36 +++++++++++++++++---
.../hbase/coprocessor/endpoint/IIEndpoint.java | 3 +-
.../apache/kylin/streaming/StreamBuilder.java | 28 ++++++++-------
.../invertedindex/IIStreamBuilder.java | 31 ++++++++++-------
8 files changed, 100 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
index 4ac3f01..39a04cf 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
@@ -42,6 +42,7 @@ public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
long timestamp;
int nRecords;
ColumnValueContainer[] containers;
+ private Map<Integer, Dictionary<?>> localDictionaries;
public Slice(TableRecordInfoDigest digest, short shard, long timestamp, ColumnValueContainer[] containers) {
this.info = digest;
@@ -59,8 +60,11 @@ public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
}
public Map<Integer, Dictionary<?>> getLocalDictionaries() {
- // TODO localdict
- return Collections.emptyMap();
+ return localDictionaries;
+ }
+
+ public void setLocalDictionaries(Map<Integer, Dictionary<?>> localDictionaries) {
+ this.localDictionaries = localDictionaries;
}
public int getRecordCount() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index 68feb03..5535fcc 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -77,23 +77,6 @@ public class TableRecordInfo {
this.digest = createDigest(dictionaryMap, measureCodecMap);
}
- private List<Integer> getLocalDictColumnList() {
- //TODO localdict
- return Collections.emptyList();
- }
-
-// public void updateDictionary(List<Dictionary<?>> dicts) {
-// List<Integer> columns = getLocalDictColumnList();
-//
-// if (columns.size() != dicts.size()) {
-// throw new RuntimeException("columns size not equal to dicts size");
-// }
-//
-// for (Integer index : columns) {
-// this.dictionaries[index] = dicts.get(index);
-// }
-// }
-
public TableRecordInfoDigest getDigest() {
return digest;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 81f4d56..8ea4e70 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -18,17 +18,16 @@
package org.apache.kylin.invertedindex.model;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
+import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.collect.Lists;
import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.invertedindex.index.*;
/**
@@ -69,7 +68,7 @@ public class IIKeyValueCodec {
ImmutableBytesWritable key = encodeKey(slice.getShard(),
slice.getTimestamp(), col, -1);
ImmutableBytesWritable value = container.toBytes();
- return new KeyValuePair(key, value);
+ return new KeyValuePair(col, key, value);
}
private List<KeyValuePair> collectKeyValues(Slice slice,
@@ -80,7 +79,7 @@ public class IIKeyValueCodec {
for (int v = 0; v < values.size(); v++) {
ImmutableBytesWritable key = encodeKey(slice.getShard(),
slice.getTimestamp(), col, v);
- list.add(new KeyValuePair(key, values.get(v)));
+ list.add(new KeyValuePair(col, key, values.get(v)));
}
return list;
}
@@ -133,6 +132,7 @@ public class IIKeyValueCodec {
int lastCol = -1;
ColumnValueContainer[] containers = null;
List<ImmutableBytesWritable> bitMapValues = Lists.newArrayList();
+ Map<Integer, Dictionary<?>> localDictionaries = Maps.newHashMap();
Decoder(TableRecordInfoDigest info,
Iterable<KeyValuePair> kvs) {
@@ -151,6 +151,7 @@ public class IIKeyValueCodec {
ImmutableBytesWritable k = kv.getKey();
ImmutableBytesWritable v = kv.getValue();
decodeKey(k);
+ localDictionaries.put(curCol, kv.getDictionary());
if (curShard != lastShard
|| curSliceTimestamp != lastSliceTimestamp) {
@@ -216,11 +217,13 @@ public class IIKeyValueCodec {
if (containers != null) {
next = new Slice(info, lastShard, lastSliceTimestamp,
containers);
+ next.setLocalDictionaries(Maps.newHashMap(localDictionaries));
}
lastSliceTimestamp = Long.MIN_VALUE;
lastCol = -1;
containers = null;
bitMapValues.clear();
+ localDictionaries.clear();
}
private void addBitMapContainer(int col) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
index 7e43b2a..48c45f6 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
@@ -35,27 +35,36 @@
package org.apache.kylin.invertedindex.model;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.dict.Dictionary;
/**
* Created by qianzhou on 3/10/15.
*/
public final class KeyValuePair {
- private ImmutableBytesWritable key;
- private ImmutableBytesWritable value;
- private ImmutableBytesWritable dictionary;
+ private final ImmutableBytesWritable key;
+ private final ImmutableBytesWritable value;
+ private final int id;
+ private Dictionary<?> dictionary;
public KeyValuePair(ImmutableBytesWritable key, ImmutableBytesWritable value) {
- this(key, value, null);
+ this(-1, key, value);
}
-
- public KeyValuePair(ImmutableBytesWritable key, ImmutableBytesWritable value, ImmutableBytesWritable dictionary) {
+ public KeyValuePair(int id, ImmutableBytesWritable key, ImmutableBytesWritable value) {
+ this.id = id;
this.key = key;
this.value = value;
+ }
+
+ public void setDictionary(Dictionary<?> dictionary) {
this.dictionary = dictionary;
}
- public ImmutableBytesWritable getDictionary() {
+ public int getId() {
+ return this.id;
+ }
+
+ public Dictionary<?> getDictionary() {
return dictionary;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
index 6bcae0d..47aa7d9 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
@@ -22,10 +22,17 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.KeyValuePair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
import java.io.Closeable;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -37,6 +44,7 @@ import java.util.List;
public class HbaseServerKVIterator implements Iterable<KeyValuePair>, Closeable {
private RegionScanner innerScanner;
+ private Logger logger = LoggerFactory.getLogger(HbaseServerKVIterator.class);
List<Cell> results = new ArrayList<Cell>();
@@ -55,6 +63,7 @@ public class HbaseServerKVIterator implements Iterable<KeyValuePair>, Closeable
ImmutableBytesWritable key = new ImmutableBytesWritable();
ImmutableBytesWritable value = new ImmutableBytesWritable();
+ ImmutableBytesWritable dict = new ImmutableBytesWritable();
KeyValuePair pair = new KeyValuePair(key, value);
private boolean hasMore = true;
@@ -76,9 +85,28 @@ public class HbaseServerKVIterator implements Iterable<KeyValuePair>, Closeable
if (results.size() < 1)
throw new IllegalStateException("Hbase row contains less than 1 cell");
- Cell c = results.get(0);
- key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
- value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+ Dictionary<?> dictionary = null;
+ boolean hasDictData = false;
+ for (Cell c : results) {
+ if (BytesUtil.compareBytes(IIDesc.HBASE_QUALIFIER_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_QUALIFIER_BYTES.length) == 0) {
+ key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
+ value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+ } else if (BytesUtil.compareBytes(IIDesc.HBASE_DICTIONARY_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_DICTIONARY_BYTES.length) == 0) {
+ dict.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+ hasDictData = true;
+ }
+ }
+ if (hasDictData) {
+ try {
+ final DataInputStream in = new DataInputStream(new ByteArrayInputStream(dict.get(), dict.getOffset(), dict.getLength()));
+ String type = in.readUTF();
+ dictionary = (Dictionary<?>) ClassUtil.forName(type, Dictionary.class).newInstance();
+ dictionary.readFields(in);
+ } catch (Exception e) {
+ logger.error("error create dictionary", e);
+ }
+ }
+ pair.setDictionary(dictionary);
results.clear();
return pair;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index b72a376..fbe3e8b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -63,7 +63,8 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
private Scan buildScan() {
Scan scan = new Scan();
- scan.addColumn(Bytes.toBytes(IIDesc.HBASE_FAMILY), Bytes.toBytes(IIDesc.HBASE_QUALIFIER));
+ scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
+ scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES);
return scan;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index a86a9b1..39c3966 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -60,7 +60,7 @@ public abstract class StreamBuilder implements Runnable {
this.sliceSize = sliceSize;
}
- protected abstract boolean build(List<Stream> streamsToBuild);
+ protected abstract void build(List<Stream> streamsToBuild) throws Exception;
private void clearCounter() {
lastBuildTime = System.currentTimeMillis();
@@ -72,7 +72,13 @@ public abstract class StreamBuilder implements Runnable {
List<Stream> streamToBuild = Lists.newArrayList();
clearCounter();
while (true) {
- final Stream stream = streamQueue.poll(50, TimeUnit.MILLISECONDS);
+ Stream stream;
+ try {
+ stream = streamQueue.poll(50, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("stream queue interrupted", e);
+ continue;
+ }
if (stream == null) {
continue;
} else {
@@ -86,21 +92,19 @@ public abstract class StreamBuilder implements Runnable {
}
streamToBuild.add(stream);
if (streamToBuild.size() >= this.sliceSize) {
- if (build(streamToBuild)) {
- clearCounter();
- streamToBuild.clear();
- }
+ build(streamToBuild);
+ clearCounter();
+ streamToBuild.clear();
} else if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) {
- if (build(streamToBuild)) {
- clearCounter();
- streamToBuild.clear();
- }
+ build(streamToBuild);
+ clearCounter();
+ streamToBuild.clear();
} else {
continue;
}
}
- } catch (InterruptedException e) {
- logger.error("StreamBuilder has been interrupted", e);
+ } catch (Exception e) {
+ logger.error("build stream error, stop building", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 36536bb..a4c7167 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -60,6 +60,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -90,7 +92,7 @@ public class IIStreamBuilder extends StreamBuilder {
}
@Override
- protected boolean build(List<Stream> streamsToBuild) {
+ protected void build(List<Stream> streamsToBuild) throws IOException {
List<List<String>> table = Lists.transform(streamsToBuild, new Function<Stream, List<String>>() {
@Nullable
@Override
@@ -110,14 +112,8 @@ public class IIStreamBuilder extends StreamBuilder {
TableRecordInfo tableRecordInfo = new TableRecordInfo(desc, dictionaryMap, measureCodecMap);
SliceBuilder sliceBuilder = new SliceBuilder(tableRecordInfo, (short) partitionId);
final Slice slice = buildSlice(table, sliceBuilder, tableRecordInfo);
-// try {
-// loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
-// submitOffset();
-// } catch (IOException e) {
-// logger.error("error load to hbase, build failed", e);
-// return false;
-// }
- return true;
+ loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()), desc.listAllColumns(), dictionaryMap);
+ submitOffset();
}
private Map<TblColRef, Dictionary<?>> buildDictionary(List<List<String>> table, IIDesc desc) {
@@ -162,7 +158,7 @@ public class IIStreamBuilder extends StreamBuilder {
return sliceBuilder.close();
}
- private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
+ private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec, List<TblColRef> allColumns, Map<TblColRef, Dictionary<?>> localDictionaries) throws IOException {
try {
List<Put> data = Lists.newArrayList();
for (KeyValuePair pair : codec.encodeKeyValue(slice)) {
@@ -170,9 +166,20 @@ public class IIStreamBuilder extends StreamBuilder {
final byte[] value = pair.getValue().get();
Put put = new Put(key);
put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
+
//dictionary
-// put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
- data.add(put);
+ final int id = pair.getId();
+ if (id >= 0 && id < allColumns.size()) {
+ final Dictionary<?> dictionary = localDictionaries.get(allColumns.get(id));
+ if (dictionary != null) {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+ out.writeUTF(dictionary.getClass().getName());
+ dictionary.write(out);
+ put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, baos.toByteArray());
+ }
+ data.add(put);
+ }
}
hTable.put(data);
} finally {