You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:01:18 UTC
[43/50] [abbrv] incubator-kylin git commit: fix
fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/5880b79b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/5880b79b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/5880b79b
Branch: refs/heads/streaming-localdict
Commit: 5880b79bd99d9be52e22022de0b75d0fd4bda8d4
Parents: 73c53b4
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 13 16:51:33 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 13 16:51:33 2015 +0800
----------------------------------------------------------------------
.../invertedindex/index/TableRecordInfo.java | 65 ++++++++++----------
.../index/TableRecordInfoDigest.java | 14 ++++-
.../invertedindex/model/IIKeyValueCodec.java | 30 ++++++---
.../storage/hbase/InvertedIndexHBaseTest.java | 26 +++-----
4 files changed, 77 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5880b79b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index c178742..3f8b72a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -18,20 +18,19 @@
package org.apache.kylin.invertedindex.index;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
import com.google.common.collect.Maps;
import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.TblColRef;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
/**
* @author yangli9
* <p/>
@@ -46,31 +45,7 @@ public class TableRecordInfo {
final Map<Integer, Dictionary<?>> dictionaryMap;
public TableRecordInfo(IISegment iiSegment) {
-
- this.desc = iiSegment.getIIInstance().getDescriptor();
- this.dictionaryMap = Maps.newHashMap();
- Map<TblColRef, FixedLenMeasureCodec<?>> measureCodecMap = Maps.newHashMap();
-
- DictionaryManager dictMgr = DictionaryManager.getInstance(desc.getConfig());
- int index = 0;
- for (TblColRef tblColRef : desc.listAllColumns()) {
- ColumnDesc col = tblColRef.getColumn();
- if (desc.isMetricsCol(index)) {
- measureCodecMap.put(tblColRef, FixedLenMeasureCodec.get(col.getType()));
- } else {
- String dictPath = iiSegment.getDictResPath(tblColRef);
- if (dictPath != null) {
- try {
- dictionaryMap.put(index, dictMgr.getDictionary(dictPath));
- } catch (IOException e) {
- throw new RuntimeException("dictionary " + dictPath + " does not exist ", e);
- }
- }
- }
- index++;
- }
-
- digest = createDigest(dictionaryMap, measureCodecMap);
+ this(iiSegment.getIIDesc(), Collections.<Integer, Dictionary<?>>emptyMap());
}
public TableRecordInfo(IIDesc desc, Map<Integer, Dictionary<?>> dictionaryMap) {
@@ -127,6 +102,34 @@ public class TableRecordInfo {
return new TableRecordInfoDigest(nColumns, byteFormLen, offsets, dictMaxIds, lengths, isMetric, dataTypes);
}
+ public static TableRecordInfoDigest createDigest(int nColumns, boolean[] isMetric, String[] dataTypes, Map<Integer, Dictionary<?>> dictionaryMap) {
+ int[] dictMaxIds = new int[nColumns];
+ int[] lengths = new int[nColumns];
+ for (int i = 0; i < nColumns; ++i) {
+ if (isMetric[i]) {
+ final FixedLenMeasureCodec<?> fixedLenMeasureCodec = FixedLenMeasureCodec.get(DataType.getInstance(dataTypes[i]));
+ lengths[i] = fixedLenMeasureCodec.getLength();
+ } else {
+ final Dictionary<?> dictionary = dictionaryMap.get(i);
+ if (dictionary != null) {
+ lengths[i] = dictionary.getSizeOfId();
+ dictMaxIds[i] = dictionary.getMaxId();
+ }
+ }
+ }
+ // offsets
+ int pos = 0;
+ int[] offsets = new int[nColumns];
+ for (int i = 0; i < nColumns; i++) {
+ offsets[i] = pos;
+ pos += lengths[i];
+ }
+
+ int byteFormLen = pos;
+
+ return new TableRecordInfoDigest(nColumns, byteFormLen, offsets, dictMaxIds, lengths, isMetric, dataTypes);
+ }
+
public TableRecord createTableRecord() {
return new TableRecord(digest.createTableRecordBytes(), this);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5880b79b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
index ff6b192..cf519d7 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
@@ -89,7 +89,19 @@ public class TableRecordInfoDigest {
return ret;
}
- public RawTableRecord createTableRecordBytes() {
+ public boolean[] getIsMetric() {
+ return isMetric;
+ }
+
+ public int[] getLengths() {
+ return lengths;
+ }
+
+ public String[] getMetricDataTypes() {
+ return metricDataTypes;
+ }
+
+ public RawTableRecord createTableRecordBytes() {
return new RawTableRecord(this);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5880b79b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index ad1b25c..e3284fc 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -115,16 +115,16 @@ public class IIKeyValueCodec implements KeyValueCodec {
@Override
public Iterable<Slice> decodeKeyValue(Iterable<IIRow> kvs) {
return new IIRowDecoder(digest, kvs.iterator());
-// return new Decoder(kvs, digest);
+// return new Decoder(kvs, incompleteDigest);
}
private static class IIRowDecoder implements Iterable<Slice> {
- private final TableRecordInfoDigest digest;
+ private final TableRecordInfoDigest incompleteDigest;
private final Iterator<IIRow> iterator;
private IIRowDecoder(TableRecordInfoDigest digest, Iterator<IIRow> iterator) {
- this.digest = digest;
+ this.incompleteDigest = digest;
this.iterator = iterator;
}
@@ -139,14 +139,15 @@ public class IIKeyValueCodec implements KeyValueCodec {
@Override
public Slice next() {
int columns = 0;
- ColumnValueContainer[] valueContainers = new ColumnValueContainer[digest.getColumnCount()];
+ ColumnValueContainer[] valueContainers = new ColumnValueContainer[incompleteDigest.getColumnCount()];
Map<Integer, Dictionary<?>> localDictionaries = Maps.newHashMap();
boolean firstTime = true;
short curShard = 0;
long curTimestamp = 0;
short lastShard = 0;
long lastTimestamp = 0;
- while (iterator().hasNext() && columns < digest.getColumnCount()) {
+
+ while (iterator().hasNext() && columns < incompleteDigest.getColumnCount()) {
final IIRow row = iterator.next();
final ImmutableBytesWritable key = row.getKey();
int i = key.getOffset();
@@ -161,16 +162,25 @@ public class IIKeyValueCodec implements KeyValueCodec {
}
int curCol = BytesUtil.readUnsigned(key.get(), i, COLNO_LEN);
- CompressedValueContainer c = new CompressedValueContainer(digest, curCol, 0);
- c.fromBytes(row.getValue());
- valueContainers[curCol] = c;
- localDictionaries.put(curCol, deserialize(row.getDictionary()));
+ final Dictionary<?> dictionary = deserialize(row.getDictionary());
+ if (incompleteDigest.isMetrics(curCol)) {
+ CompressedValueContainer c = new CompressedValueContainer(incompleteDigest, curCol, 0);
+ c.fromBytes(row.getValue());
+ valueContainers[curCol] = c;
+ } else {
+ CompressedValueContainer c = new CompressedValueContainer(dictionary.getSizeOfId(), dictionary.getMaxId() - dictionary.getMinId() + 1, 0);
+ c.fromBytes(row.getValue());
+ valueContainers[curCol] = c;
+ }
+ localDictionaries.put(curCol, dictionary);
columns++;
lastShard = curShard;
lastTimestamp = curTimestamp;
firstTime = false;
}
- Preconditions.checkArgument(columns == digest.getColumnCount(), "column count is " + columns + " should be equals to digest.getColumnCount() " + digest.getColumnCount());
+ Preconditions.checkArgument(columns == incompleteDigest.getColumnCount(), "column count is " + columns + " should be equals to incompleteDigest.getColumnCount() " + incompleteDigest.getColumnCount());
+
+ TableRecordInfoDigest digest = TableRecordInfo.createDigest(columns, incompleteDigest.getIsMetric(), incompleteDigest.getMetricDataTypes(), localDictionaries);
Slice slice = new Slice(digest, curShard, curTimestamp, valueContainers);
slice.setLocalDictionaries(localDictionaries);
return slice;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5880b79b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
index 9e37447..bcf5b10 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
@@ -18,23 +18,12 @@
package org.apache.kylin.storage.hbase;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
import com.google.common.collect.Lists;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.invertedindex.IIInstance;
@@ -46,6 +35,11 @@ import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
/**
* @author yangli9
@@ -69,7 +63,7 @@ public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
Configuration hconf = HadoopUtil.newHBaseConfiguration(hbaseUrl);
hconn = HConnectionManager.createConnection(hconf);
- this.info = new TableRecordInfo(seg.getIIDesc(), null);
+ this.info = new TableRecordInfo(seg);
}
@After