You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:01:23 UTC
[48/50] [abbrv] incubator-kylin git commit: KYLIN-625, draft impl,
ready for very first test
KYLIN-625, draft impl, ready for very first test
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4e290fb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4e290fb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4e290fb0
Branch: refs/heads/streaming-localdict
Commit: 4e290fb07a842a874421f0ce7618d4469dbd95d0
Parents: 5059723
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Mar 13 18:41:55 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Mar 13 18:41:55 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/ByteArray.java | 72 ++++++-
.../org/apache/kylin/common/util/BytesUtil.java | 21 +++
.../kylin/dict/lookup/LookupBytesTable.java | 2 +-
.../cube/FactDistinctColumnsCombiner.java | 2 +-
.../hadoop/cube/FactDistinctColumnsReducer.java | 2 +-
.../job/hadoop/cube/NewBaseCuboidMapper.java | 2 +-
.../IIDistinctColumnsCombiner.java | 2 +-
.../invertedindex/IIDistinctColumnsReducer.java | 2 +-
.../kylin/metadata/filter/CaseTupleFilter.java | 7 +-
.../metadata/filter/ColumnTupleFilter.java | 7 +-
.../metadata/filter/CompareTupleFilter.java | 7 +-
.../metadata/filter/ConstantTupleFilter.java | 7 +-
.../metadata/filter/DynamicTupleFilter.java | 7 +-
.../metadata/filter/ExtractTupleFilter.java | 7 +-
.../metadata/filter/IFilterCodeSystem.java | 30 +++
.../metadata/filter/LogicalTupleFilter.java | 13 +-
.../kylin/metadata/filter/StringCodeSystem.java | 15 +-
.../kylin/metadata/filter/TupleFilter.java | 7 +-
.../metadata/filter/TupleFilterSerializer.java | 11 +-
.../metadata/measure/BigDecimalSerializer.java | 75 --------
.../metadata/measure/DataTypeSerializer.java | 83 --------
.../metadata/measure/DoubleSerializer.java | 60 ------
.../kylin/metadata/measure/HLLCSerializer.java | 72 -------
.../kylin/metadata/measure/LongSerializer.java | 67 -------
.../kylin/metadata/measure/MeasureCodec.java | 2 +-
.../serializer/BigDecimalSerializer.java | 75 ++++++++
.../metadata/serializer/DataTypeSerializer.java | 87 +++++++++
.../metadata/serializer/DoubleSerializer.java | 60 ++++++
.../metadata/serializer/HLLCSerializer.java | 72 +++++++
.../metadata/serializer/LongSerializer.java | 67 +++++++
.../metadata/serializer/StringSerializer.java | 30 +++
.../kylin/metadata/tuple/ICodeSystem.java | 30 ---
.../storage/gridtable/GTAggregateScanner.java | 144 ++++++++++++++
.../kylin/storage/gridtable/GTBuilder.java | 32 +---
.../kylin/storage/gridtable/GTCodeSystem.java | 76 ++++++++
.../apache/kylin/storage/gridtable/GTInfo.java | 188 ++++++++++++++++---
.../kylin/storage/gridtable/GTRawScanner.java | 169 +++++++++++++++++
.../kylin/storage/gridtable/GTRecord.java | 131 ++++++++++++-
.../kylin/storage/gridtable/GTRowBlock.java | 103 ++++++++--
.../apache/kylin/storage/gridtable/GTStore.java | 35 ----
.../kylin/storage/gridtable/GridTable.java | 25 ++-
.../kylin/storage/gridtable/IGTCodeSystem.java | 25 +++
.../kylin/storage/gridtable/IGTScanner.java | 7 +
.../kylin/storage/gridtable/IGTStore.java | 37 ++++
.../kylin/storage/gridtable/IKVStoreReader.java | 11 --
.../kylin/storage/gridtable/IKVStoreWriter.java | 11 --
.../gridtable/memstore/GTSimpleMemStore.java | 96 ++++++++++
.../hbase/coprocessor/DictCodeSystem.java | 19 +-
.../kylin/storage/filter/FilterBaseTest.java | 3 +-
49 files changed, 1521 insertions(+), 594 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/ByteArray.java b/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
index df6f01b..3d3a291 100644
--- a/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
+++ b/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
@@ -18,7 +18,7 @@
package org.apache.kylin.common.util;
-import java.util.Arrays;
+import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -27,15 +27,65 @@ import org.apache.hadoop.hbase.util.Bytes;
*/
public class ByteArray implements Comparable<ByteArray> {
- public byte[] data;
+ public static ByteArray allocate(int length) {
+ return new ByteArray(new byte[length]);
+ }
+
+ // ============================================================================
+
+ private byte[] data;
+ private int offset;
+ private int length;
+
+ public ByteArray() {
+ set(null, 0, 0);
+ }
public ByteArray(byte[] data) {
- this.data = data;
+ set(data, 0, data.length);
+ }
+
+ public ByteArray(byte[] data, int offset, int length) {
+ set(data, offset, length);
+ }
+
+ public byte[] array() {
+ return data;
+ }
+
+ public int offset() {
+ return offset;
+ }
+
+ public int length() {
+ return length;
+ }
+
+ public void set(byte[] array) {
+ set(array, 0, array.length);
+ }
+
+ public void set(byte[] array, int offset, int length) {
+ this.data = array;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public void set(ByteArray o) {
+ set(o.data, o.offset, o.length);
+ }
+
+ public void setLength(int length) {
+ this.length = length;
+ }
+
+ public ByteBuffer wrapAsBuffer() {
+ return ByteBuffer.wrap(data, offset, length);
}
@Override
public int hashCode() {
- return Bytes.hashCode(data);
+ return Bytes.hashCode(data, offset, length);
}
@Override
@@ -46,14 +96,18 @@ public class ByteArray implements Comparable<ByteArray> {
return false;
if (getClass() != obj.getClass())
return false;
- ByteArray other = (ByteArray) obj;
- if (!Arrays.equals(data, other.data))
- return false;
- return true;
+ ByteArray o = (ByteArray) obj;
+ return Bytes.equals(this.data, this.offset, this.length, o.data, o.offset, o.length);
}
@Override
public int compareTo(ByteArray o) {
- return Bytes.compareTo(this.data, o.data);
+ return Bytes.compareTo(this.data, this.offset, this.length, o.data, o.offset, o.length);
+ }
+
+ @Override
+ public String toString() {
+ return Bytes.toString(data, offset, length);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 2cd8d7d..dbd459d 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -311,6 +311,15 @@ public class BytesUtil {
out.put(array);
}
+ public static void writeByteArray(byte[] array, int offset, int length, ByteBuffer out) {
+ if (array == null) {
+ writeVInt(-1, out);
+ return;
+ }
+ writeVInt(array.length, out);
+ out.put(array, offset, length);
+ }
+
public static byte[] readByteArray(ByteBuffer in) {
int len = readVInt(in);
if (len < 0)
@@ -320,6 +329,18 @@ public class BytesUtil {
in.get(array);
return array;
}
+
+ public static int peekByteArrayLength(ByteBuffer in) {
+ int start = in.position();
+ int arrayLen = readVInt(in);
+ int sizeLen = in.position() - start;
+ in.position(start);
+
+ if (arrayLen < 0)
+ return sizeLen;
+ else
+ return sizeLen + arrayLen;
+ }
public static void writeBooleanArray(boolean[] array, ByteBuffer out) {
if (array == null) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
index 0b40be0..6e5f7d9 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
@@ -46,7 +46,7 @@ public class LookupBytesTable extends LookupTable<ByteArray> {
@Override
protected String toString(ByteArray cell) {
- return Bytes.toString(cell.data);
+ return cell.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
index c9a7a34..b3912d9 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
@@ -50,7 +50,7 @@ public class FactDistinctColumnsCombiner extends KylinReducer<ShortWritable, Tex
}
for (ByteArray value : set) {
- outputValue.set(value.data);
+ outputValue.set(value.array(), value.offset(), value.length());
context.write(key, outputValue);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index fdae135..15b4970 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -82,7 +82,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
try {
for (ByteArray value : set) {
- out.write(value.data);
+ out.write(value.array(), value.offset(), value.length());
out.write('\n');
}
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
index 9dda89c..e75457e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
@@ -256,7 +256,7 @@ public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, T
}
} else {
for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
- keyBytesBuf[relation.getSecond()] = dimRow[relation.getFirst()].data;
+ keyBytesBuf[relation.getSecond()] = dimRow[relation.getFirst()].array();
filledDimension++;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
index dc04d6e..dcfd6f6 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
@@ -51,7 +51,7 @@ public class IIDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text,
}
for (ByteArray value : set) {
- outputValue.set(value.data);
+ outputValue.set(value.array(), value.offset(), value.length());
context.write(key, outputValue);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
index d8f9543..e5eb65e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
@@ -66,7 +66,7 @@ public class IIDistinctColumnsReducer extends KylinReducer<ShortWritable, Text,
try {
for (ByteArray value : set) {
- out.write(value.data);
+ out.write(value.array(), value.offset(), value.length());
out.write('\n');
}
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
index 4b85172..e8ccd27 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
@@ -65,7 +64,7 @@ public class CaseTupleFilter extends TupleFilter {
}
@Override
- public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+ public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
if (whenFilters.size() != thenFilters.size()) {
elseFilter = whenFilters.remove(whenFilters.size() - 1);
}
@@ -103,12 +102,12 @@ public class CaseTupleFilter extends TupleFilter {
}
@Override
- public byte[] serialize(ICodeSystem<?> cs) {
+ public byte[] serialize(IFilterCodeSystem<?> cs) {
return new byte[0];
}
@Override
- public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+ public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
index 75f534d..f689ccb 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
@@ -28,7 +28,6 @@ import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
/**
@@ -68,7 +67,7 @@ public class ColumnTupleFilter extends TupleFilter {
}
@Override
- public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+ public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
this.tupleValue = tuple.getValue(columnRef);
return true;
}
@@ -85,7 +84,7 @@ public class ColumnTupleFilter extends TupleFilter {
}
@Override
- public byte[] serialize(ICodeSystem<?> cs) {
+ public byte[] serialize(IFilterCodeSystem<?> cs) {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
String table = columnRef.getTable();
BytesUtil.writeUTFString(table, buffer);
@@ -105,7 +104,7 @@ public class ColumnTupleFilter extends TupleFilter {
}
@Override
- public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+ public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
TableDesc table = null;
ColumnDesc column = new ColumnDesc();
ByteBuffer buffer = ByteBuffer.wrap(bytes);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
index 71950ff..899320f 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
@@ -27,7 +27,6 @@ import java.util.Map;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
/**
@@ -131,7 +130,7 @@ public class CompareTupleFilter extends TupleFilter {
// TODO requires generalize, currently only evaluates COLUMN {op} CONST
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
- public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem cs) {
+ public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem cs) {
// extract tuple value
Object tupleValue = null;
for (TupleFilter filter : this.children) {
@@ -199,7 +198,7 @@ public class CompareTupleFilter extends TupleFilter {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public byte[] serialize(ICodeSystem cs) {
+ public byte[] serialize(IFilterCodeSystem cs) {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
int size = this.dynamicVariables.size();
BytesUtil.writeVInt(size, buffer);
@@ -213,7 +212,7 @@ public class CompareTupleFilter extends TupleFilter {
}
@Override
- public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+ public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
this.dynamicVariables.clear();
ByteBuffer buffer = ByteBuffer.wrap(bytes);
int size = BytesUtil.readVInt(buffer);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
index c92b743..f372b4a 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.HashSet;
import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
/**
@@ -65,7 +64,7 @@ public class ConstantTupleFilter extends TupleFilter {
}
@Override
- public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+ public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
return constantValues.size() > 0;
}
@@ -81,7 +80,7 @@ public class ConstantTupleFilter extends TupleFilter {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public byte[] serialize(ICodeSystem cs) {
+ public byte[] serialize(IFilterCodeSystem cs) {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
int size = this.constantValues.size();
BytesUtil.writeVInt(size, buffer);
@@ -94,7 +93,7 @@ public class ConstantTupleFilter extends TupleFilter {
}
@Override
- public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+ public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
this.constantValues.clear();
ByteBuffer buffer = ByteBuffer.wrap(bytes);
int size = BytesUtil.readVInt(buffer);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
index 3d7f65f..a482519 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.Collections;
import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
/**
@@ -55,7 +54,7 @@ public class DynamicTupleFilter extends TupleFilter {
}
@Override
- public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+ public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
return true;
}
@@ -70,7 +69,7 @@ public class DynamicTupleFilter extends TupleFilter {
}
@Override
- public byte[] serialize(ICodeSystem<?> cs) {
+ public byte[] serialize(IFilterCodeSystem<?> cs) {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
BytesUtil.writeUTFString(variableName, buffer);
byte[] result = new byte[buffer.position()];
@@ -79,7 +78,7 @@ public class DynamicTupleFilter extends TupleFilter {
}
@Override
- public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+ public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
this.variableName = BytesUtil.readUTFString(buffer);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
index ad1395c..a7705d3 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
@@ -55,7 +54,7 @@ public class ExtractTupleFilter extends TupleFilter {
}
@Override
- public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+ public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
// extract tuple value
String extractType = null;
String tupleValue = null;
@@ -115,12 +114,12 @@ public class ExtractTupleFilter extends TupleFilter {
}
@Override
- public byte[] serialize(ICodeSystem<?> cs) {
+ public byte[] serialize(IFilterCodeSystem<?> cs) {
return new byte[0];
}
@Override
- public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+ public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
new file mode 100644
index 0000000..5382dcc
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
@@ -0,0 +1,30 @@
+package org.apache.kylin.metadata.filter;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Decides how constant values are coded and compared.
+ *
+ * TupleFilter are involved in both query engine and coprocessor. In query engine, the values are strings.
+ * In coprocessor, the values are dictionary IDs.
+ *
+ * The type parameter is the java type of code, which should be bytes. However some legacy implementation
+ * stores code as String.
+ *
+ * @author yangli9
+ */
+public interface IFilterCodeSystem<T> {
+
+ /** if given code represents the NULL value */
+ boolean isNull(T code);
+
+ /** compare two values by their codes */
+ int compare(T code1, T code2);
+
+ /** write code to buffer */
+ void serialize(T code, ByteBuffer buf);
+
+ /** read code from buffer */
+ T deserialize(ByteBuffer buf);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
index 0c8f96b..1844392 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
@@ -72,7 +71,7 @@ public class LogicalTupleFilter extends TupleFilter {
}
@Override
- public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+ public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
switch (this.operator) {
case AND:
return evalAnd(tuple, cs);
@@ -85,7 +84,7 @@ public class LogicalTupleFilter extends TupleFilter {
}
}
- private boolean evalAnd(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+ private boolean evalAnd(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
for (TupleFilter filter : this.children) {
if (!filter.evaluate(tuple, cs)) {
return false;
@@ -94,7 +93,7 @@ public class LogicalTupleFilter extends TupleFilter {
return true;
}
- private boolean evalOr(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+ private boolean evalOr(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
for (TupleFilter filter : this.children) {
if (filter.evaluate(tuple, cs)) {
return true;
@@ -103,7 +102,7 @@ public class LogicalTupleFilter extends TupleFilter {
return false;
}
- private boolean evalNot(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+ private boolean evalNot(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
return !this.children.get(0).evaluate(tuple, cs);
}
@@ -118,12 +117,12 @@ public class LogicalTupleFilter extends TupleFilter {
}
@Override
- public byte[] serialize(ICodeSystem<?> cs) {
+ public byte[] serialize(IFilterCodeSystem<?> cs) {
return new byte[0];
}
@Override
- public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+ public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
index 5240491..5b0040d 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
@@ -3,32 +3,21 @@ package org.apache.kylin.metadata.filter;
import java.nio.ByteBuffer;
import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
/**
* A simple code system where all values are strings and conform to string comparison system.
*
* @author yangli9
*/
-public class StringCodeSystem implements ICodeSystem<String> {
+public class StringCodeSystem implements IFilterCodeSystem<String> {
public static final StringCodeSystem INSTANCE = new StringCodeSystem();
- private StringCodeSystem() {
+ protected StringCodeSystem() {
// singleton
}
@Override
- public String encode(Object value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object decode(String code) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public boolean isNull(String value) {
return value == null;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 75d55c3..5dfce7d 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
import com.google.common.collect.Maps;
@@ -190,13 +189,13 @@ public abstract class TupleFilter {
public abstract boolean isEvaluable();
- public abstract boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs);
+ public abstract boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs);
public abstract Collection<?> getValues();
- abstract byte[] serialize(ICodeSystem<?> cs);
+ abstract byte[] serialize(IFilterCodeSystem<?> cs);
- abstract void deserialize(byte[] bytes, ICodeSystem<?> cs);
+ abstract void deserialize(byte[] bytes, IFilterCodeSystem<?> cs);
public static boolean isEvaluableRecursively(TupleFilter filter) {
if (filter == null)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
index 5164c5b..f824777 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Stack;
import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
/**
* http://eli.thegreenplace.net/2011/09/29/an-interesting-tree-serialization-algorithm-from-dwarf
@@ -47,11 +46,11 @@ public class TupleFilterSerializer {
}
}
- public static byte[] serialize(TupleFilter rootFilter, ICodeSystem<?> cs) {
+ public static byte[] serialize(TupleFilter rootFilter, IFilterCodeSystem<?> cs) {
return serialize(rootFilter, null, cs);
}
- public static byte[] serialize(TupleFilter rootFilter, Decorator decorator, ICodeSystem<?> cs) {
+ public static byte[] serialize(TupleFilter rootFilter, Decorator decorator, IFilterCodeSystem<?> cs) {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
internalSerialize(rootFilter, decorator, buffer, cs);
byte[] result = new byte[buffer.position()];
@@ -59,7 +58,7 @@ public class TupleFilterSerializer {
return result;
}
- private static void internalSerialize(TupleFilter filter, Decorator decorator, ByteBuffer buffer, ICodeSystem<?> cs) {
+ private static void internalSerialize(TupleFilter filter, Decorator decorator, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
if (decorator != null) { // give decorator a chance to manipulate the
// output filter
filter = decorator.onSerialize(filter);
@@ -84,7 +83,7 @@ public class TupleFilterSerializer {
}
}
- private static void serializeFilter(int flag, TupleFilter filter, Decorator decorator, ByteBuffer buffer, ICodeSystem<?> cs) {
+ private static void serializeFilter(int flag, TupleFilter filter, Decorator decorator, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
if (flag < 0) {
BytesUtil.writeVInt(-1, buffer);
} else {
@@ -96,7 +95,7 @@ public class TupleFilterSerializer {
}
}
- public static TupleFilter deserialize(byte[] bytes, ICodeSystem<?> cs) {
+ public static TupleFilter deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
TupleFilter rootFilter = null;
Stack<TupleFilter> parentStack = new Stack<TupleFilter>();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java
deleted file mode 100644
index 78654ea..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.measure;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- * @author yangli9
- *
- */
-public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
-
- @Override
- public void serialize(BigDecimal value, ByteBuffer out) {
- byte[] bytes = value.unscaledValue().toByteArray();
-
- BytesUtil.writeVInt(value.scale(), out);
- BytesUtil.writeVInt(bytes.length, out);
- out.put(bytes);
- }
-
- @Override
- public BigDecimal deserialize(ByteBuffer in) {
- int scale = BytesUtil.readVInt(in);
- int n = BytesUtil.readVInt(in);
-
- byte[] bytes = new byte[n];
- in.get(bytes);
-
- return new BigDecimal(new BigInteger(bytes), scale);
- }
-
-
- @Override
- public int peekLength(ByteBuffer in) {
- int mark = in.position();
-
- @SuppressWarnings("unused")
- int scale = BytesUtil.readVInt(in);
- int n = BytesUtil.readVInt(in);
- int len = in.position() - mark + n;
-
- in.position(mark);
- return len;
- }
-
- @Override
- public BigDecimal valueOf(byte[] value) {
- if (value == null)
- return new BigDecimal(0);
- else
- return new BigDecimal(Bytes.toString(value));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/DataTypeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/DataTypeSerializer.java
deleted file mode 100644
index ecaaea7..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/DataTypeSerializer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.measure;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.metadata.model.DataType;
-
-/**
- * @author yangli9
- *
- */
-abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
-
- final static HashMap<String, Class<?>> implementations = new HashMap<String, Class<?>>();
- static {
- implementations.put("decimal", BigDecimalSerializer.class);
- implementations.put("double", DoubleSerializer.class);
- implementations.put("float", DoubleSerializer.class);
- implementations.put("bigint", LongSerializer.class);
- implementations.put("long", LongSerializer.class);
- implementations.put("integer", LongSerializer.class);
- implementations.put("int", LongSerializer.class);
- }
-
- public static DataTypeSerializer<?> create(String dataType) {
- DataType type = DataType.getInstance(dataType);
- if (type.isHLLC()) {
- return new HLLCSerializer(type.getPrecision());
- }
-
- Class<?> clz = implementations.get(type.getName());
- if (clz == null)
- throw new RuntimeException("No MeasureSerializer for type " + dataType);
-
- try {
- return (DataTypeSerializer<?>) clz.newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e); // never happen
- }
- }
-
- /** peek into buffer and return the length of serialization */
- abstract public int peekLength(ByteBuffer in);
-
- /** convert from String to obj */
- abstract public T valueOf(byte[] value);
-
- public T valueOf(String value) {
- try {
- return valueOf(value.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e); // never happen
- }
- }
-
- /** convert from obj to string */
- public String toString(T value) {
- if (value == null)
- return "NULL";
- else
- return value.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java
deleted file mode 100644
index 2874a57..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.measure;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DoubleWritable;
-
-/**
- * @author yangli9
- *
- */
-public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
-
- // avoid mass object creation
- DoubleWritable current = new DoubleWritable();
-
- @Override
- public void serialize(DoubleWritable value, ByteBuffer out) {
- out.putDouble(value.get());
- }
-
- @Override
- public DoubleWritable deserialize(ByteBuffer in) {
- current.set(in.getDouble());
- return current;
- }
-
- @Override
- public int peekLength(ByteBuffer in) {
- return 8;
- }
-
- @Override
- public DoubleWritable valueOf(byte[] value) {
- if (value == null)
- current.set(0d);
- else
- current.set(Double.parseDouble(Bytes.toString(value)));
- return current;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
deleted file mode 100644
index 2303c82..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.measure;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-
-/**
- * @author yangli9
- *
- */
-public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
-
- HyperLogLogPlusCounter current;
-
- public HLLCSerializer(int p) {
- current = new HyperLogLogPlusCounter(p);
- }
-
- @Override
- public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
- try {
- value.writeRegisters(out);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
- try {
- current.readRegisters(in);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return current;
- }
-
- @Override
- public int peekLength(ByteBuffer in) {
- return current.peekLength(in);
- }
-
- @Override
- public HyperLogLogPlusCounter valueOf(byte[] value) {
- current.clear();
- if (value == null)
- current.add("__nUlL__");
- else
- current.add(value);
- return current;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java
deleted file mode 100644
index 9dca987..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.measure;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- * @author yangli9
- *
- */
-public class LongSerializer extends DataTypeSerializer<LongWritable> {
-
- // avoid mass object creation
- LongWritable current = new LongWritable();
-
- @Override
- public void serialize(LongWritable value, ByteBuffer out) {
- BytesUtil.writeVLong(value.get(), out);
- }
-
- @Override
- public LongWritable deserialize(ByteBuffer in) {
- current.set(BytesUtil.readVLong(in));
- return current;
- }
-
- @Override
- public int peekLength(ByteBuffer in) {
- int mark = in.position();
-
- BytesUtil.readVLong(in);
- int len = in.position() - mark;
-
- in.position(mark);
- return len;
- }
-
- @Override
- public LongWritable valueOf(byte[] value) {
- if (value == null)
- current.set(0L);
- else
- current.set(Long.parseLong(Bytes.toString(value)));
- return current;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
index 95a246c..9da0fbe 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
@@ -22,8 +22,8 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import org.apache.hadoop.io.Text;
-
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.serializer.DataTypeSerializer;
/**
* @author yangli9
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
new file mode 100644
index 0000000..0c7b37d
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.serializer;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ * @author yangli9
+ *
+ */
+public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
+
+ @Override
+ public void serialize(BigDecimal value, ByteBuffer out) {
+ byte[] bytes = value.unscaledValue().toByteArray();
+
+ BytesUtil.writeVInt(value.scale(), out);
+ BytesUtil.writeVInt(bytes.length, out);
+ out.put(bytes);
+ }
+
+ @Override
+ public BigDecimal deserialize(ByteBuffer in) {
+ int scale = BytesUtil.readVInt(in);
+ int n = BytesUtil.readVInt(in);
+
+ byte[] bytes = new byte[n];
+ in.get(bytes);
+
+ return new BigDecimal(new BigInteger(bytes), scale);
+ }
+
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ int mark = in.position();
+
+ @SuppressWarnings("unused")
+ int scale = BytesUtil.readVInt(in);
+ int n = BytesUtil.readVInt(in);
+ int len = in.position() - mark + n;
+
+ in.position(mark);
+ return len;
+ }
+
+ @Override
+ public BigDecimal valueOf(byte[] value) {
+ if (value == null)
+ return new BigDecimal(0);
+ else
+ return new BigDecimal(Bytes.toString(value));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
new file mode 100644
index 0000000..5828178
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.serializer;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ * @author yangli9
+ *
+ */
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+
+ final static HashMap<String, Class<?>> implementations = new HashMap<String, Class<?>>();
+ static {
+ implementations.put("string", StringSerializer.class);
+ implementations.put("decimal", BigDecimalSerializer.class);
+ implementations.put("double", DoubleSerializer.class);
+ implementations.put("float", DoubleSerializer.class);
+ implementations.put("bigint", LongSerializer.class);
+ implementations.put("long", LongSerializer.class);
+ implementations.put("integer", LongSerializer.class);
+ implementations.put("int", LongSerializer.class);
+ }
+
+ public static DataTypeSerializer<?> create(String dataType) {
+ return create(DataType.getInstance(dataType));
+ }
+
+ public static DataTypeSerializer<?> create(DataType type) {
+ if (type.isHLLC()) {
+ return new HLLCSerializer(type.getPrecision());
+ }
+
+ Class<?> clz = implementations.get(type.getName());
+ if (clz == null)
+ throw new RuntimeException("No MeasureSerializer for type " + type);
+
+ try {
+ return (DataTypeSerializer<?>) clz.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e); // never happen
+ }
+ }
+
+ /** peek into buffer and return the length of serialization */
+ abstract public int peekLength(ByteBuffer in);
+
+ /** convert from String to obj */
+ abstract public T valueOf(byte[] value);
+
+ public T valueOf(String value) {
+ try {
+ return valueOf(value.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e); // never happen
+ }
+ }
+
+ /** convert from obj to string */
+ public String toString(T value) {
+ if (value == null)
+ return "NULL";
+ else
+ return value.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
new file mode 100644
index 0000000..455a9d6
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.serializer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * @author yangli9
+ *
+ */
+public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
+
+ // avoid mass object creation
+ DoubleWritable current = new DoubleWritable();
+
+ @Override
+ public void serialize(DoubleWritable value, ByteBuffer out) {
+ out.putDouble(value.get());
+ }
+
+ @Override
+ public DoubleWritable deserialize(ByteBuffer in) {
+ current.set(in.getDouble());
+ return current;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return 8;
+ }
+
+ @Override
+ public DoubleWritable valueOf(byte[] value) {
+ if (value == null)
+ current.set(0d);
+ else
+ current.set(Double.parseDouble(Bytes.toString(value)));
+ return current;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
new file mode 100644
index 0000000..dafe661
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.serializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+
+/**
+ * @author yangli9
+ *
+ */
+public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
+
+ HyperLogLogPlusCounter current;
+
+ public HLLCSerializer(int p) {
+ current = new HyperLogLogPlusCounter(p);
+ }
+
+ @Override
+ public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
+ try {
+ value.writeRegisters(out);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
+ try {
+ current.readRegisters(in);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return current;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return current.peekLength(in);
+ }
+
+ @Override
+ public HyperLogLogPlusCounter valueOf(byte[] value) {
+ current.clear();
+ if (value == null)
+ current.add("__nUlL__");
+ else
+ current.add(value);
+ return current;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
new file mode 100644
index 0000000..5916e58
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.serializer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ * @author yangli9
+ *
+ */
+public class LongSerializer extends DataTypeSerializer<LongWritable> {
+
+ // avoid mass object creation
+ LongWritable current = new LongWritable();
+
+ @Override
+ public void serialize(LongWritable value, ByteBuffer out) {
+ BytesUtil.writeVLong(value.get(), out);
+ }
+
+ @Override
+ public LongWritable deserialize(ByteBuffer in) {
+ current.set(BytesUtil.readVLong(in));
+ return current;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ int mark = in.position();
+
+ BytesUtil.readVLong(in);
+ int len = in.position() - mark;
+
+ in.position(mark);
+ return len;
+ }
+
+ @Override
+ public LongWritable valueOf(byte[] value) {
+ if (value == null)
+ current.set(0L);
+ else
+ current.set(Long.parseLong(Bytes.toString(value)));
+ return current;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java
new file mode 100644
index 0000000..56725c0
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java
@@ -0,0 +1,30 @@
+package org.apache.kylin.metadata.serializer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+public class StringSerializer extends DataTypeSerializer<String> {
+
+ @Override
+ public void serialize(String value, ByteBuffer out) {
+ BytesUtil.writeUTFString(value, out);
+ }
+
+ @Override
+ public String deserialize(ByteBuffer in) {
+ return BytesUtil.readUTFString(in);
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return BytesUtil.peekByteArrayLength(in);
+ }
+
+ @Override
+ public String valueOf(byte[] value) {
+ return Bytes.toString(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/tuple/ICodeSystem.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ICodeSystem.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ICodeSystem.java
deleted file mode 100644
index b5bc025..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ICodeSystem.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.kylin.metadata.tuple;
-
-import java.nio.ByteBuffer;
-
-/**
- * Decides how constant values are coded and compared.
- *
- * TupleFilter are involved in both query engine and coprocessor. In query engine, the values are strings.
- * In coprocessor, the values are dictionary IDs.
- *
- * The type parameter is java type of code, which should be bytes. However some legacy implementation
- * stores code as String.
- *
- * @author yangli9
- */
-public interface ICodeSystem<T> {
-
- T encode(Object value);
-
- Object decode(T code);
-
- boolean isNull(T code);
-
- int compare(T code1, T code2);
-
- void serialize(T code, ByteBuffer buffer);
-
- T deserialize(ByteBuffer code);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
new file mode 100644
index 0000000..1a14a14
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -0,0 +1,144 @@
+package org.apache.kylin.storage.gridtable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+
+import com.google.common.collect.Maps;
+
+public class GTAggregateScanner implements IGTScanner {
+
+ final GTInfo info;
+ final BitSet dimensions;
+ final BitSet metrics;
+ final TupleFilter filter;
+ final GTRawScanner rawScanner;
+
+ GTAggregateScanner(GTInfo info, IGTStore store, GTRecord pkStart, GTRecord pkEndExclusive, BitSet dimensions, BitSet metrics, TupleFilter filter) {
+ if (dimensions.intersects(info.colIsMetrics) || metrics.intersects(info.colIsDimension))
+ throw new IllegalStateException();
+
+ this.info = info;
+ this.dimensions = dimensions;
+ this.metrics = metrics;
+ this.filter = filter;
+ this.rawScanner = new GTRawScanner(info, store, pkStart, pkEndExclusive, dimensions, metrics, filter);
+ }
+
+ @Override
+ public void close() throws IOException {
+ rawScanner.close();
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ AggregationCache aggrCache = new AggregationCache();
+ for (GTRecord r : rawScanner) {
+ aggrCache.aggregate(r);
+ }
+ return aggrCache.iterator();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ class AggregationCache {
+ final SortedMap<GTRecord, MeasureAggregator[]> aggBufMap;
+
+ public AggregationCache() {
+ this.aggBufMap = Maps.newTreeMap();
+ }
+
+ void aggregate(GTRecord r) {
+ r.maskForEqualHashComp = dimensions;
+ MeasureAggregator[] aggrs = aggBufMap.get(r);
+ if (aggrs == null) {
+ aggrs = new MeasureAggregator[metrics.cardinality()];
+ for (int i = 0, col = -1; i < aggrs.length; i++) {
+ col = metrics.nextSetBit(col + 1);
+ aggrs[i] = info.codeSystem.newMetricsAggregator(col);
+ }
+ aggBufMap.put(r.copy(dimensions), aggrs);
+ }
+
+ for (int i = 0, col = -1; i < aggrs.length; i++) {
+ col = metrics.nextSetBit(col + 1);
+ Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].wrapAsBuffer());
+ aggrs[i].aggregate(metrics);
+ }
+ }
+
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+
+ Iterator<Entry<GTRecord, MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+ ByteBuffer metricsBuf = ByteBuffer.allocate(info.maxRecordLength);
+ GTRecord oneRecord = new GTRecord(info); // avoid instance creation
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public GTRecord next() {
+ Entry<GTRecord, MeasureAggregator[]> entry = it.next();
+
+ GTRecord dims = entry.getKey();
+ for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
+ oneRecord.cols[i].set(dims.cols[i]);
+ }
+
+ metricsBuf.clear();
+ MeasureAggregator[] aggrs = entry.getValue();
+ for (int i = 0, col = -1; i < aggrs.length; i++) {
+ col = metrics.nextSetBit(col + 1);
+ int pos = metricsBuf.position();
+ info.codeSystem.encodeColumnValue(col, aggrs[i].getState(), metricsBuf);
+ oneRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
+ }
+
+ return oneRecord;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ public long getSize() {
+ return aggBufMap.size();
+ }
+
+ // ============================================================================
+
+ transient int rowMemBytes;
+ static final int MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
+
+ public void checkMemoryUsage() {
+ // about memory calculation,
+ // http://seniorjava.wordpress.com/2013/09/01/java-objects-memory-size-reference/
+ if (rowMemBytes <= 0) {
+ if (aggBufMap.size() > 0) {
+ rowMemBytes = 0;
+ MeasureAggregator[] measureAggregators = aggBufMap.get(aggBufMap.firstKey());
+ for (MeasureAggregator agg : measureAggregators) {
+ rowMemBytes += agg.getMemBytes();
+ }
+ }
+ }
+ int size = aggBufMap.size();
+ int memUsage = (40 + rowMemBytes) * size;
+ if (memUsage > MEMORY_USAGE_CAP) {
+ throw new RuntimeException("Kylin coprocess memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " + size + " > " + MEMORY_USAGE_CAP + ". Abord coprocessor.");
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
index 0083b86..7e305c5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
@@ -3,21 +3,21 @@ package org.apache.kylin.storage.gridtable;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.kylin.storage.gridtable.GTStore.GTWriter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.storage.gridtable.IGTStore.IGTStoreWriter;
public class GTBuilder implements Closeable, Flushable {
final private GTInfo info;
- final private GTWriter writer;
+ final private IGTStoreWriter writer;
private GTRowBlock block;
- public GTBuilder(GTInfo info, int shard, GTStore store) {
+ GTBuilder(GTInfo info, int shard, IGTStore store) {
this.info = info;
this.writer = store.rebuild(shard);
- this.block = new GTRowBlock(info);
+ this.block = GTRowBlock.allocate(info);
}
public void write(GTRecord r) throws IOException {
@@ -25,11 +25,8 @@ public class GTBuilder implements Closeable, Flushable {
if (block.isEmpty()) {
makePrimaryKey(r, block.primaryKey);
}
- for (int c = 0; c < info.nColBlocks; c++) {
- ByteBuffer cellBuf = block.cellBlocks[c];
- for (int i = info.colBlockCuts[c], end = info.colBlockCuts[c + 1]; i < end; i++) {
- append(cellBuf, r.cols[i]);
- }
+ for (int i = 0; i < info.colBlocks.length; i++) {
+ r.exportColumnBlock(i, block.cellBlockBuffers[i]);
}
block.nRows++;
@@ -38,24 +35,15 @@ public class GTBuilder implements Closeable, Flushable {
}
}
- private void makePrimaryKey(GTRecord r, ByteBuffer buf) {
- buf.clear();
-
- for (int i : info.primaryKey) {
- append(buf, r.cols[i]);
- }
-
- buf.flip();
- }
-
- private void append(ByteBuffer buf, ByteBuffer data) {
- buf.put(data.array(), data.arrayOffset(), data.limit());
+ private void makePrimaryKey(GTRecord r, ByteArray buf) {
+ r.exportColumns(info.primaryKey, buf);
}
@Override
public void flush() throws IOException {
writer.write(block);
block.clear();
+ block.seqId++;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/storage/src/main/java/org/apache/kylin/storage/gridtable/GTCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTCodeSystem.java
new file mode 100644
index 0000000..f9405be
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTCodeSystem.java
@@ -0,0 +1,76 @@
+package org.apache.kylin.storage.gridtable;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.filter.IFilterCodeSystem;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.metadata.serializer.DataTypeSerializer;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class GTCodeSystem implements IGTCodeSystem {
+
+ final private GTInfo info;
+ final private DataTypeSerializer[] serializers;
+ final private IFilterCodeSystem<ByteArray> filterCS;
+
+ public GTCodeSystem(GTInfo info) {
+ this.info = info;
+
+ this.serializers = new DataTypeSerializer[info.nColumns];
+ for (int i = 0; i < info.nColumns; i++) {
+ this.serializers[i] = DataTypeSerializer.create(info.colTypes[i]);
+ }
+
+ this.filterCS = new IFilterCodeSystem<ByteArray>() {
+ @Override
+ public boolean isNull(ByteArray code) {
+ return (code == null || code.length() == 0);
+ }
+
+ @Override
+ public int compare(ByteArray code1, ByteArray code2) {
+ return code1.compareTo(code2);
+ }
+
+ @Override
+ public void serialize(ByteArray code, ByteBuffer buffer) {
+ BytesUtil.writeByteArray(code.array(), code.offset(), code.length(), buffer);
+ }
+
+ @Override
+ public ByteArray deserialize(ByteBuffer buffer) {
+ return new ByteArray(BytesUtil.readByteArray(buffer));
+ }
+ };
+ }
+
+ @Override
+ public int codeLength(int col, ByteBuffer buf) {
+ return serializers[col].peekLength(buf);
+ }
+
+ @Override
+ public IFilterCodeSystem<ByteArray> getFilterCodeSystem() {
+ return filterCS;
+ }
+
+ // ============================================================================
+
+ @Override
+ public MeasureAggregator<?> newMetricsAggregator(int col) {
+ return MeasureAggregator.create(info.colMetricsAggrFunc[col], info.colTypes[col].getName());
+ }
+
+ @Override
+ public void encodeColumnValue(int col, Object value, ByteBuffer buf) {
+ serializers[col].serialize(value, buf);
+ }
+
+ @Override
+ public Object decodeColumnValue(int col, ByteBuffer buf) {
+ return serializers[col].deserialize(buf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
index 11a3083..1e2ddd0 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
@@ -1,52 +1,190 @@
package org.apache.kylin.storage.gridtable;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
import org.apache.kylin.metadata.model.DataType;
public class GTInfo {
+ IGTCodeSystem codeSystem;
+ int maxRecordLength = 1024; // column length can vary
+
// column schema
int nColumns;
- DataType[] colType;
- boolean[] colIsMetrics;
- int maxRecordLength; // column length can vary
-
+ DataType[] colTypes;
+ BitSet colIsDimension;
+ BitSet colIsMetrics;
+ BitSet colAll;
+ String[] colMetricsAggrFunc;
+
// grid info
- int[] primaryKey; // columns sorted and unique
- int rowBlockSize; // 0: no row block
- int nColBlocks;
- int[] colBlockCuts; // [i]: the start index of i.th block; [i+1]: the exclusive end index of i.th block
-
+ BitSet primaryKey; // columns sorted and unique
+ BitSet[] colBlocks; // at least one column block
+ int rowBlockSize; // 0: disable row block
+ boolean rowBlockIndexEnabled;
+
// sharding & rowkey
int nShards; // 0: no sharding
- byte[] rowkeyPrefix;
-
+
+ // must create from builder
+ private GTInfo() {
+ }
+
public boolean isShardingEnabled() {
return nShards > 0;
}
-
+
public boolean isRowBlockEnabled() {
return rowBlockSize > 0;
}
-
+
+ public boolean isRowBlockIndexEnabled() {
+ return rowBlockIndexEnabled && isRowBlockEnabled();
+ }
+
void validate() {
- if (nColBlocks != colBlockCuts.length - 1) {
+
+ if (codeSystem == null)
+ throw new IllegalStateException();
+
+ if (primaryKey.cardinality() == 0)
throw new IllegalStateException();
+
+ validateColumns();
+ validateColumnBlocks();
+ }
+
+ private void validateColumns() {
+ colAll = new BitSet();
+ colAll.flip(0, nColumns);
+
+ // dimension and metrics must over all columns
+ if (colIsDimension.intersects(colIsMetrics))
+ throw new IllegalStateException();
+ BitSet all = (BitSet) colAll.clone();
+ all.andNot(colIsDimension);
+ all.andNot(colIsMetrics);
+ if (all.isEmpty() == false)
+ throw new IllegalStateException();
+
+ // column blocks must not overlap
+ for (int i = 0; i < colBlocks.length; i++) {
+ for (int j = i + 1; j < colBlocks.length; j++) {
+ if (colBlocks[i].intersects(colBlocks[j]))
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void validateColumnBlocks() {
+ // column block must cover all columns
+ BitSet merge = new BitSet();
+ for (int i = 0; i < colBlocks.length; i++) {
+ merge.or(colBlocks[i]);
+ }
+ if (merge.equals(colAll) == false)
+ throw new IllegalStateException();
+
+ // When row block is disabled, every row is treated as a row block, row PK is same as
+ // row block PK. Thus PK can be removed from column blocks. See <code>GTRowBlock</code>.
+ if (isRowBlockEnabled() == false) {
+ for (int i = 0; i < colBlocks.length; i++) {
+ colBlocks[i].andNot(primaryKey);
+ }
+ }
+
+ // drop empty column block
+ LinkedList<BitSet> tmp = new LinkedList<BitSet>(Arrays.asList(colBlocks));
+ Iterator<BitSet> it = tmp.iterator();
+ while (it.hasNext()) {
+ BitSet cb = it.next();
+ if (cb.isEmpty())
+ it.remove();
+ }
+ colBlocks = (BitSet[]) tmp.toArray(new BitSet[tmp.size()]);
+ }
+
+ public boolean isDimension(int i) {
+ return colIsMetrics.get(i) == false;
+ }
+
+ public boolean isMetrics(int i) {
+ return colIsMetrics.get(i);
+ }
+
+ public static class Builder {
+ final GTInfo info;
+
+ public Builder() {
+ this.info = new GTInfo();
+ }
+
+ /** required */
+ public Builder setCodeSystem(IGTCodeSystem cs) {
+ info.codeSystem = cs;
+ return this;
+ }
+
+ /** required */
+ public Builder setColumns(DataType[] colTypes, BitSet metrics) {
+ info.nColumns = colTypes.length;
+ info.colTypes = colTypes;
+ if (info.colBlocks == null) {
+ BitSet all = new BitSet();
+ all.flip(0, info.nColumns);
+ info.colBlocks = new BitSet[] { all };
+ }
+ return this;
}
- for (int i = 0; i < nColBlocks; i++) {
- int cbStart = colBlockCuts[i], cbEnd = colBlockCuts[i + 1];
- if (cbStart >= cbEnd)
- throw new IllegalStateException();
-
- // when row block disabled, primary key is persisted on rowkey, thus shall not repeat in column block
- if (isRowBlockEnabled() == false) {
- for (int ii : primaryKey) {
- if (ii >= cbStart && ii < cbEnd)
- throw new IllegalStateException();
+ public Builder setMetrics(Map<Integer, String> metricsAggrFunc) {
+ info.colIsDimension = new BitSet();
+ info.colIsMetrics = new BitSet();
+ info.colMetricsAggrFunc = new String[info.nColumns];
+ for (int i = 0; i < info.nColumns; i++) {
+ if (metricsAggrFunc.containsKey(i)) {
+ info.colIsMetrics.set(i);
+ info.colMetricsAggrFunc[i] = metricsAggrFunc.get(i);
+ } else {
+ info.colIsDimension.set(i);
}
}
+ return this;
+ }
+
+ /** required */
+ public Builder setPrimaryKey(BitSet primaryKey) {
+ info.primaryKey = primaryKey;
+ return this;
+ }
+
+ /** optional */
+ public Builder setMaxRecordLength(int len) {
+ info.maxRecordLength = len;
+ return this;
+ }
+
+ /** optional */
+ public Builder enableColumnBlock(BitSet[] columnBlocks) {
+ info.colBlocks = columnBlocks;
+ return this;
}
+ /** optional */
+ public Builder enableRowBlock(int rowBlockSize, boolean enableIndex) {
+ info.rowBlockSize = rowBlockSize;
+ info.rowBlockIndexEnabled = enableIndex;
+ return this;
+ }
+
+ /** optional */
+ public Builder enableSharding(int nShards) {
+ info.nShards = nShards;
+ return this;
+ }
}
-
}