You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:01:23 UTC

[48/50] [abbrv] incubator-kylin git commit: KYLIN-625, draft impl, ready for very first test

KYLIN-625, draft impl, ready for very first test


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4e290fb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4e290fb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4e290fb0

Branch: refs/heads/streaming-localdict
Commit: 4e290fb07a842a874421f0ce7618d4469dbd95d0
Parents: 5059723
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Mar 13 18:41:55 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Mar 13 18:41:55 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/ByteArray.java |  72 ++++++-
 .../org/apache/kylin/common/util/BytesUtil.java |  21 +++
 .../kylin/dict/lookup/LookupBytesTable.java     |   2 +-
 .../cube/FactDistinctColumnsCombiner.java       |   2 +-
 .../hadoop/cube/FactDistinctColumnsReducer.java |   2 +-
 .../job/hadoop/cube/NewBaseCuboidMapper.java    |   2 +-
 .../IIDistinctColumnsCombiner.java              |   2 +-
 .../invertedindex/IIDistinctColumnsReducer.java |   2 +-
 .../kylin/metadata/filter/CaseTupleFilter.java  |   7 +-
 .../metadata/filter/ColumnTupleFilter.java      |   7 +-
 .../metadata/filter/CompareTupleFilter.java     |   7 +-
 .../metadata/filter/ConstantTupleFilter.java    |   7 +-
 .../metadata/filter/DynamicTupleFilter.java     |   7 +-
 .../metadata/filter/ExtractTupleFilter.java     |   7 +-
 .../metadata/filter/IFilterCodeSystem.java      |  30 +++
 .../metadata/filter/LogicalTupleFilter.java     |  13 +-
 .../kylin/metadata/filter/StringCodeSystem.java |  15 +-
 .../kylin/metadata/filter/TupleFilter.java      |   7 +-
 .../metadata/filter/TupleFilterSerializer.java  |  11 +-
 .../metadata/measure/BigDecimalSerializer.java  |  75 --------
 .../metadata/measure/DataTypeSerializer.java    |  83 --------
 .../metadata/measure/DoubleSerializer.java      |  60 ------
 .../kylin/metadata/measure/HLLCSerializer.java  |  72 -------
 .../kylin/metadata/measure/LongSerializer.java  |  67 -------
 .../kylin/metadata/measure/MeasureCodec.java    |   2 +-
 .../serializer/BigDecimalSerializer.java        |  75 ++++++++
 .../metadata/serializer/DataTypeSerializer.java |  87 +++++++++
 .../metadata/serializer/DoubleSerializer.java   |  60 ++++++
 .../metadata/serializer/HLLCSerializer.java     |  72 +++++++
 .../metadata/serializer/LongSerializer.java     |  67 +++++++
 .../metadata/serializer/StringSerializer.java   |  30 +++
 .../kylin/metadata/tuple/ICodeSystem.java       |  30 ---
 .../storage/gridtable/GTAggregateScanner.java   | 144 ++++++++++++++
 .../kylin/storage/gridtable/GTBuilder.java      |  32 +---
 .../kylin/storage/gridtable/GTCodeSystem.java   |  76 ++++++++
 .../apache/kylin/storage/gridtable/GTInfo.java  | 188 ++++++++++++++++---
 .../kylin/storage/gridtable/GTRawScanner.java   | 169 +++++++++++++++++
 .../kylin/storage/gridtable/GTRecord.java       | 131 ++++++++++++-
 .../kylin/storage/gridtable/GTRowBlock.java     | 103 ++++++++--
 .../apache/kylin/storage/gridtable/GTStore.java |  35 ----
 .../kylin/storage/gridtable/GridTable.java      |  25 ++-
 .../kylin/storage/gridtable/IGTCodeSystem.java  |  25 +++
 .../kylin/storage/gridtable/IGTScanner.java     |   7 +
 .../kylin/storage/gridtable/IGTStore.java       |  37 ++++
 .../kylin/storage/gridtable/IKVStoreReader.java |  11 --
 .../kylin/storage/gridtable/IKVStoreWriter.java |  11 --
 .../gridtable/memstore/GTSimpleMemStore.java    |  96 ++++++++++
 .../hbase/coprocessor/DictCodeSystem.java       |  19 +-
 .../kylin/storage/filter/FilterBaseTest.java    |   3 +-
 49 files changed, 1521 insertions(+), 594 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/ByteArray.java b/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
index df6f01b..3d3a291 100644
--- a/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
+++ b/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
@@ -18,7 +18,7 @@
 
 package org.apache.kylin.common.util;
 
-import java.util.Arrays;
+import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -27,15 +27,65 @@ import org.apache.hadoop.hbase.util.Bytes;
  */
 public class ByteArray implements Comparable<ByteArray> {
 
-    public byte[] data;
+    public static ByteArray allocate(int length) {
+        return new ByteArray(new byte[length]);
+    }
+    
+    // ============================================================================
+
+    private byte[] data;
+    private int offset;
+    private int length;
+
+    public ByteArray() {
+        set(null, 0, 0);
+    }
 
     public ByteArray(byte[] data) {
-        this.data = data;
+        set(data, 0, data.length);
+    }
+
+    public ByteArray(byte[] data, int offset, int length) {
+        set(data, offset, length);
+    }
+
+    public byte[] array() {
+        return data;
+    }
+
+    public int offset() {
+        return offset;
+    }
+
+    public int length() {
+        return length;
+    }
+
+    public void set(byte[] array) {
+        set(array, 0, array.length);
+    }
+
+    public void set(byte[] array, int offset, int length) {
+        this.data = array;
+        this.offset = offset;
+        this.length = length;
+    }
+
+    public void set(ByteArray o) {
+        set(o.data, o.offset, o.length);
+    }
+
+    public void setLength(int length) {
+        this.length = length;
+    }
+
+    public ByteBuffer wrapAsBuffer() {
+        return ByteBuffer.wrap(data, offset, length);
     }
 
     @Override
     public int hashCode() {
-        return Bytes.hashCode(data);
+        return Bytes.hashCode(data, offset, length);
     }
 
     @Override
@@ -46,14 +96,18 @@ public class ByteArray implements Comparable<ByteArray> {
             return false;
         if (getClass() != obj.getClass())
             return false;
-        ByteArray other = (ByteArray) obj;
-        if (!Arrays.equals(data, other.data))
-            return false;
-        return true;
+        ByteArray o = (ByteArray) obj;
+        return Bytes.equals(this.data, this.offset, this.length, o.data, o.offset, o.length);
     }
 
     @Override
     public int compareTo(ByteArray o) {
-        return Bytes.compareTo(this.data, o.data);
+        return Bytes.compareTo(this.data, this.offset, this.length, o.data, o.offset, o.length);
+    }
+
+    @Override
+    public String toString() {
+        return Bytes.toString(data, offset, length);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 2cd8d7d..dbd459d 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -311,6 +311,15 @@ public class BytesUtil {
         out.put(array);
     }
 
+    public static void writeByteArray(byte[] array, int offset, int length, ByteBuffer out) {
+        if (array == null) {
+            writeVInt(-1, out);
+            return;
+        }
+        writeVInt(array.length, out);
+        out.put(array, offset, length);
+    }
+
     public static byte[] readByteArray(ByteBuffer in) {
         int len = readVInt(in);
         if (len < 0)
@@ -320,6 +329,18 @@ public class BytesUtil {
         in.get(array);
         return array;
     }
+    
+    public static int peekByteArrayLength(ByteBuffer in) {
+        int start = in.position();
+        int arrayLen = readVInt(in);
+        int sizeLen = in.position() - start;
+        in.position(start);
+        
+        if (arrayLen < 0)
+            return sizeLen;
+        else
+            return sizeLen + arrayLen;
+    }
 
     public static void writeBooleanArray(boolean[] array, ByteBuffer out) {
         if (array == null) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
index 0b40be0..6e5f7d9 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
@@ -46,7 +46,7 @@ public class LookupBytesTable extends LookupTable<ByteArray> {
 
     @Override
     protected String toString(ByteArray cell) {
-        return Bytes.toString(cell.data);
+        return cell.toString();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
index c9a7a34..b3912d9 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
@@ -50,7 +50,7 @@ public class FactDistinctColumnsCombiner extends KylinReducer<ShortWritable, Tex
         }
 
         for (ByteArray value : set) {
-            outputValue.set(value.data);
+            outputValue.set(value.array(), value.offset(), value.length());
             context.write(key, outputValue);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index fdae135..15b4970 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -82,7 +82,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
 
         try {
             for (ByteArray value : set) {
-                out.write(value.data);
+                out.write(value.array(), value.offset(), value.length());
                 out.write('\n');
             }
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
index 9dda89c..e75457e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
@@ -256,7 +256,7 @@ public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, T
                 }
             } else {
                 for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
-                    keyBytesBuf[relation.getSecond()] = dimRow[relation.getFirst()].data;
+                    keyBytesBuf[relation.getSecond()] = dimRow[relation.getFirst()].array();
                     filledDimension++;
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
index dc04d6e..dcfd6f6 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
@@ -51,7 +51,7 @@ public class IIDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text,
         }
 
         for (ByteArray value : set) {
-            outputValue.set(value.data);
+            outputValue.set(value.array(), value.offset(), value.length());
             context.write(key, outputValue);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
index d8f9543..e5eb65e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
@@ -66,7 +66,7 @@ public class IIDistinctColumnsReducer extends KylinReducer<ShortWritable, Text,
 
         try {
             for (ByteArray value : set) {
-                out.write(value.data);
+                out.write(value.array(), value.offset(), value.length());
                 out.write('\n');
             }
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
index 4b85172..e8ccd27 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.kylin.metadata.tuple.ICodeSystem;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
 
@@ -65,7 +64,7 @@ public class CaseTupleFilter extends TupleFilter {
     }
 
     @Override
-    public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+    public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
         if (whenFilters.size() != thenFilters.size()) {
             elseFilter = whenFilters.remove(whenFilters.size() - 1);
         }
@@ -103,12 +102,12 @@ public class CaseTupleFilter extends TupleFilter {
     }
 
     @Override
-    public byte[] serialize(ICodeSystem<?> cs) {
+    public byte[] serialize(IFilterCodeSystem<?> cs) {
         return new byte[0];
     }
 
     @Override
-    public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
index 75f534d..f689ccb 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
@@ -28,7 +28,6 @@ import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
 /**
@@ -68,7 +67,7 @@ public class ColumnTupleFilter extends TupleFilter {
     }
 
     @Override
-    public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+    public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
         this.tupleValue = tuple.getValue(columnRef);
         return true;
     }
@@ -85,7 +84,7 @@ public class ColumnTupleFilter extends TupleFilter {
     }
 
     @Override
-    public byte[] serialize(ICodeSystem<?> cs) {
+    public byte[] serialize(IFilterCodeSystem<?> cs) {
         ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
         String table = columnRef.getTable();
         BytesUtil.writeUTFString(table, buffer);
@@ -105,7 +104,7 @@ public class ColumnTupleFilter extends TupleFilter {
     }
 
     @Override
-    public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
         TableDesc table = null;
         ColumnDesc column = new ColumnDesc();
         ByteBuffer buffer = ByteBuffer.wrap(bytes);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
index 71950ff..899320f 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
@@ -27,7 +27,6 @@ import java.util.Map;
 
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
 /**
@@ -131,7 +130,7 @@ public class CompareTupleFilter extends TupleFilter {
     // TODO requires generalize, currently only evaluates COLUMN {op} CONST
     @Override
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem cs) {
+    public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem cs) {
         // extract tuple value
         Object tupleValue = null;
         for (TupleFilter filter : this.children) {
@@ -199,7 +198,7 @@ public class CompareTupleFilter extends TupleFilter {
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
-    public byte[] serialize(ICodeSystem cs) {
+    public byte[] serialize(IFilterCodeSystem cs) {
         ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
         int size = this.dynamicVariables.size();
         BytesUtil.writeVInt(size, buffer);
@@ -213,7 +212,7 @@ public class CompareTupleFilter extends TupleFilter {
     }
 
     @Override
-    public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
         this.dynamicVariables.clear();
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
         int size = BytesUtil.readVInt(buffer);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
index c92b743..f372b4a 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.HashSet;
 
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
 /**
@@ -65,7 +64,7 @@ public class ConstantTupleFilter extends TupleFilter {
     }
 
     @Override
-    public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+    public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
         return constantValues.size() > 0;
     }
 
@@ -81,7 +80,7 @@ public class ConstantTupleFilter extends TupleFilter {
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
-    public byte[] serialize(ICodeSystem cs) {
+    public byte[] serialize(IFilterCodeSystem cs) {
         ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
         int size = this.constantValues.size();
         BytesUtil.writeVInt(size, buffer);
@@ -94,7 +93,7 @@ public class ConstantTupleFilter extends TupleFilter {
     }
 
     @Override
-    public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
         this.constantValues.clear();
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
         int size = BytesUtil.readVInt(buffer);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
index 3d7f65f..a482519 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Collections;
 
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
 /**
@@ -55,7 +54,7 @@ public class DynamicTupleFilter extends TupleFilter {
     }
 
     @Override
-    public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+    public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
         return true;
     }
 
@@ -70,7 +69,7 @@ public class DynamicTupleFilter extends TupleFilter {
     }
 
     @Override
-    public byte[] serialize(ICodeSystem<?> cs) {
+    public byte[] serialize(IFilterCodeSystem<?> cs) {
         ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
         BytesUtil.writeUTFString(variableName, buffer);
         byte[] result = new byte[buffer.position()];
@@ -79,7 +78,7 @@ public class DynamicTupleFilter extends TupleFilter {
     }
 
     @Override
-    public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
         this.variableName = BytesUtil.readUTFString(buffer);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
index ad1395c..a7705d3 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.kylin.metadata.tuple.ICodeSystem;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
 
@@ -55,7 +54,7 @@ public class ExtractTupleFilter extends TupleFilter {
     }
 
     @Override
-    public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+    public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
         // extract tuple value
         String extractType = null;
         String tupleValue = null;
@@ -115,12 +114,12 @@ public class ExtractTupleFilter extends TupleFilter {
     }
 
     @Override
-    public byte[] serialize(ICodeSystem<?> cs) {
+    public byte[] serialize(IFilterCodeSystem<?> cs) {
         return new byte[0];
     }
 
     @Override
-    public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
new file mode 100644
index 0000000..5382dcc
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
@@ -0,0 +1,30 @@
+package org.apache.kylin.metadata.filter;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Decides how constant values are coded and compared.
+ * 
+ * TupleFilter are involved in both query engine and coprocessor. In query engine, the values are strings.
+ * In coprocessor, the values are dictionary IDs.
+ * 
+ * The type parameter is the java type of code, which should be bytes. However some legacy implementation
+ * stores code as String.
+ * 
+ * @author yangli9
+ */
+public interface IFilterCodeSystem<T> {
+    
+    /** if given code represents the NULL value */
+    boolean isNull(T code);
+
+    /** compare two values by their codes */
+    int compare(T code1, T code2);
+
+    /** write code to buffer */
+    void serialize(T code, ByteBuffer buf);
+
+    /** read code from buffer */
+    T deserialize(ByteBuffer buf);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
index 0c8f96b..1844392 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.kylin.metadata.tuple.ICodeSystem;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
 
@@ -72,7 +71,7 @@ public class LogicalTupleFilter extends TupleFilter {
     }
 
     @Override
-    public boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+    public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
         switch (this.operator) {
         case AND:
             return evalAnd(tuple, cs);
@@ -85,7 +84,7 @@ public class LogicalTupleFilter extends TupleFilter {
         }
     }
 
-    private boolean evalAnd(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+    private boolean evalAnd(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
         for (TupleFilter filter : this.children) {
             if (!filter.evaluate(tuple, cs)) {
                 return false;
@@ -94,7 +93,7 @@ public class LogicalTupleFilter extends TupleFilter {
         return true;
     }
 
-    private boolean evalOr(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+    private boolean evalOr(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
         for (TupleFilter filter : this.children) {
             if (filter.evaluate(tuple, cs)) {
                 return true;
@@ -103,7 +102,7 @@ public class LogicalTupleFilter extends TupleFilter {
         return false;
     }
 
-    private boolean evalNot(IEvaluatableTuple tuple, ICodeSystem<?> cs) {
+    private boolean evalNot(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
         return !this.children.get(0).evaluate(tuple, cs);
     }
 
@@ -118,12 +117,12 @@ public class LogicalTupleFilter extends TupleFilter {
     }
 
     @Override
-    public byte[] serialize(ICodeSystem<?> cs) {
+    public byte[] serialize(IFilterCodeSystem<?> cs) {
         return new byte[0];
     }
 
     @Override
-    public void deserialize(byte[] bytes, ICodeSystem<?> cs) {
+    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
index 5240491..5b0040d 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
@@ -3,32 +3,21 @@ package org.apache.kylin.metadata.filter;
 import java.nio.ByteBuffer;
 
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
 
 /**
  * A simple code system where all values are strings and conform to string comparison system.
  * 
  * @author yangli9
  */
-public class StringCodeSystem implements ICodeSystem<String> {
+public class StringCodeSystem implements IFilterCodeSystem<String> {
     
     public static final StringCodeSystem INSTANCE = new StringCodeSystem();
     
-    private StringCodeSystem() {
+    protected StringCodeSystem() {
         // singleton
     }
 
     @Override
-    public String encode(Object value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object decode(String code) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public boolean isNull(String value) {
         return value == null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 75d55c3..5dfce7d 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
 import com.google.common.collect.Maps;
@@ -190,13 +189,13 @@ public abstract class TupleFilter {
 
     public abstract boolean isEvaluable();
 
-    public abstract boolean evaluate(IEvaluatableTuple tuple, ICodeSystem<?> cs);
+    public abstract boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs);
 
     public abstract Collection<?> getValues();
 
-    abstract byte[] serialize(ICodeSystem<?> cs);
+    abstract byte[] serialize(IFilterCodeSystem<?> cs);
 
-    abstract void deserialize(byte[] bytes, ICodeSystem<?> cs);
+    abstract void deserialize(byte[] bytes, IFilterCodeSystem<?> cs);
 
     public static boolean isEvaluableRecursively(TupleFilter filter) {
         if (filter == null)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
index 5164c5b..f824777 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Stack;
 
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.tuple.ICodeSystem;
 
 /**
  * http://eli.thegreenplace.net/2011/09/29/an-interesting-tree-serialization-algorithm-from-dwarf
@@ -47,11 +46,11 @@ public class TupleFilterSerializer {
         }
     }
 
-    public static byte[] serialize(TupleFilter rootFilter, ICodeSystem<?> cs) {
+    public static byte[] serialize(TupleFilter rootFilter, IFilterCodeSystem<?> cs) {
         return serialize(rootFilter, null, cs);
     }
 
-    public static byte[] serialize(TupleFilter rootFilter, Decorator decorator, ICodeSystem<?> cs) {
+    public static byte[] serialize(TupleFilter rootFilter, Decorator decorator, IFilterCodeSystem<?> cs) {
         ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
         internalSerialize(rootFilter, decorator, buffer, cs);
         byte[] result = new byte[buffer.position()];
@@ -59,7 +58,7 @@ public class TupleFilterSerializer {
         return result;
     }
 
-    private static void internalSerialize(TupleFilter filter, Decorator decorator, ByteBuffer buffer, ICodeSystem<?> cs) {
+    private static void internalSerialize(TupleFilter filter, Decorator decorator, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
         if (decorator != null) { // give decorator a chance to manipulate the
                                  // output filter
             filter = decorator.onSerialize(filter);
@@ -84,7 +83,7 @@ public class TupleFilterSerializer {
         }
     }
 
-    private static void serializeFilter(int flag, TupleFilter filter, Decorator decorator, ByteBuffer buffer, ICodeSystem<?> cs) {
+    private static void serializeFilter(int flag, TupleFilter filter, Decorator decorator, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
         if (flag < 0) {
             BytesUtil.writeVInt(-1, buffer);
         } else {
@@ -96,7 +95,7 @@ public class TupleFilterSerializer {
         }
     }
 
-    public static TupleFilter deserialize(byte[] bytes, ICodeSystem<?> cs) {
+    public static TupleFilter deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
         TupleFilter rootFilter = null;
         Stack<TupleFilter> parentStack = new Stack<TupleFilter>();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java
deleted file mode 100644
index 78654ea..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSerializer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.measure;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- * @author yangli9
- * 
- */
-public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
-
-    @Override
-    public void serialize(BigDecimal value, ByteBuffer out) {
-        byte[] bytes = value.unscaledValue().toByteArray();
-
-        BytesUtil.writeVInt(value.scale(), out);
-        BytesUtil.writeVInt(bytes.length, out);
-        out.put(bytes);
-    }
-
-    @Override
-    public BigDecimal deserialize(ByteBuffer in) {
-        int scale = BytesUtil.readVInt(in);
-        int n = BytesUtil.readVInt(in);
-
-        byte[] bytes = new byte[n];
-        in.get(bytes);
-
-        return new BigDecimal(new BigInteger(bytes), scale);
-    }
-
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        int mark = in.position();
-        
-        @SuppressWarnings("unused")
-        int scale = BytesUtil.readVInt(in);
-        int n = BytesUtil.readVInt(in);
-        int len = in.position() - mark + n;
-        
-        in.position(mark);
-        return len;
-    }
-
-    @Override
-    public BigDecimal valueOf(byte[] value) {
-        if (value == null)
-            return new BigDecimal(0);
-        else
-            return new BigDecimal(Bytes.toString(value));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/DataTypeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/DataTypeSerializer.java
deleted file mode 100644
index ecaaea7..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/DataTypeSerializer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.measure;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.metadata.model.DataType;
-
-/**
- * @author yangli9
- * 
- */
-abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
-
-    final static HashMap<String, Class<?>> implementations = new HashMap<String, Class<?>>();
-    static {
-        implementations.put("decimal", BigDecimalSerializer.class);
-        implementations.put("double", DoubleSerializer.class);
-        implementations.put("float", DoubleSerializer.class);
-        implementations.put("bigint", LongSerializer.class);
-        implementations.put("long", LongSerializer.class);
-        implementations.put("integer", LongSerializer.class);
-        implementations.put("int", LongSerializer.class);
-    }
-
-    public static DataTypeSerializer<?> create(String dataType) {
-        DataType type = DataType.getInstance(dataType);
-        if (type.isHLLC()) {
-            return new HLLCSerializer(type.getPrecision());
-        }
-
-        Class<?> clz = implementations.get(type.getName());
-        if (clz == null)
-            throw new RuntimeException("No MeasureSerializer for type " + dataType);
-
-        try {
-            return (DataTypeSerializer<?>) clz.newInstance();
-        } catch (Exception e) {
-            throw new RuntimeException(e); // never happen
-        }
-    }
-    
-    /** peek into buffer and return the length of serialization */
-    abstract public int peekLength(ByteBuffer in);
-    
-    /** convert from String to obj */
-    abstract public T valueOf(byte[] value);
-    
-    public T valueOf(String value) {
-        try {
-            return valueOf(value.getBytes("UTF-8"));
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e); // never happen
-        }
-    }
-
-    /** convert from obj to string */
-    public String toString(T value) {
-        if (value == null)
-            return "NULL";
-        else
-            return value.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java
deleted file mode 100644
index 2874a57..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSerializer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.measure;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DoubleWritable;
-
-/**
- * @author yangli9
- * 
- */
-public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
-
-    // avoid mass object creation
-    DoubleWritable current = new DoubleWritable();
-
-    @Override
-    public void serialize(DoubleWritable value, ByteBuffer out) {
-        out.putDouble(value.get());
-    }
-
-    @Override
-    public DoubleWritable deserialize(ByteBuffer in) {
-        current.set(in.getDouble());
-        return current;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return 8;
-    }
-
-    @Override
-    public DoubleWritable valueOf(byte[] value) {
-        if (value == null)
-            current.set(0d);
-        else
-            current.set(Double.parseDouble(Bytes.toString(value)));
-        return current;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
deleted file mode 100644
index 2303c82..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.measure;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-
-/**
- * @author yangli9
- * 
- */
-public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
-
-    HyperLogLogPlusCounter current;
-
-    public HLLCSerializer(int p) {
-        current = new HyperLogLogPlusCounter(p);
-    }
-
-    @Override
-    public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
-        try {
-            value.writeRegisters(out);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
-        try {
-            current.readRegisters(in);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return current;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return current.peekLength(in);
-    }
-
-    @Override
-    public HyperLogLogPlusCounter valueOf(byte[] value) {
-        current.clear();
-        if (value == null)
-            current.add("__nUlL__");
-        else
-            current.add(value);
-        return current;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java
deleted file mode 100644
index 9dca987..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/LongSerializer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.measure;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- * @author yangli9
- * 
- */
-public class LongSerializer extends DataTypeSerializer<LongWritable> {
-
-    // avoid mass object creation
-    LongWritable current = new LongWritable();
-
-    @Override
-    public void serialize(LongWritable value, ByteBuffer out) {
-        BytesUtil.writeVLong(value.get(), out);
-    }
-
-    @Override
-    public LongWritable deserialize(ByteBuffer in) {
-        current.set(BytesUtil.readVLong(in));
-        return current;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        int mark = in.position();
-        
-        BytesUtil.readVLong(in);
-        int len = in.position() - mark;
-        
-        in.position(mark);
-        return len;
-    }
-
-    @Override
-    public LongWritable valueOf(byte[] value) {
-        if (value == null)
-            current.set(0L);
-        else
-            current.set(Long.parseLong(Bytes.toString(value)));
-        return current;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
index 95a246c..9da0fbe 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
@@ -22,8 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 
 import org.apache.hadoop.io.Text;
-
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.serializer.DataTypeSerializer;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
new file mode 100644
index 0000000..0c7b37d
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.serializer;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
+
+    @Override
+    public void serialize(BigDecimal value, ByteBuffer out) {
+        byte[] bytes = value.unscaledValue().toByteArray();
+
+        BytesUtil.writeVInt(value.scale(), out);
+        BytesUtil.writeVInt(bytes.length, out);
+        out.put(bytes);
+    }
+
+    @Override
+    public BigDecimal deserialize(ByteBuffer in) {
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+
+        byte[] bytes = new byte[n];
+        in.get(bytes);
+
+        return new BigDecimal(new BigInteger(bytes), scale);
+    }
+
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+        
+        @SuppressWarnings("unused")
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+        int len = in.position() - mark + n;
+        
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public BigDecimal valueOf(byte[] value) {
+        if (value == null)
+            return new BigDecimal(0);
+        else
+            return new BigDecimal(Bytes.toString(value));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
new file mode 100644
index 0000000..5828178
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.serializer;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ * @author yangli9
+ * 
+ */
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+
+    final static HashMap<String, Class<?>> implementations = new HashMap<String, Class<?>>();
+    static {
+        implementations.put("string", StringSerializer.class);
+        implementations.put("decimal", BigDecimalSerializer.class);
+        implementations.put("double", DoubleSerializer.class);
+        implementations.put("float", DoubleSerializer.class);
+        implementations.put("bigint", LongSerializer.class);
+        implementations.put("long", LongSerializer.class);
+        implementations.put("integer", LongSerializer.class);
+        implementations.put("int", LongSerializer.class);
+    }
+
+    public static DataTypeSerializer<?> create(String dataType) {
+        return create(DataType.getInstance(dataType));
+    }
+    
+    public static DataTypeSerializer<?> create(DataType type) {
+        if (type.isHLLC()) {
+            return new HLLCSerializer(type.getPrecision());
+        }
+
+        Class<?> clz = implementations.get(type.getName());
+        if (clz == null)
+            throw new RuntimeException("No MeasureSerializer for type " + type);
+
+        try {
+            return (DataTypeSerializer<?>) clz.newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException(e); // never happen
+        }
+    }
+    
+    /** peek into buffer and return the length of serialization */
+    abstract public int peekLength(ByteBuffer in);
+    
+    /** convert from String to obj */
+    abstract public T valueOf(byte[] value);
+    
+    public T valueOf(String value) {
+        try {
+            return valueOf(value.getBytes("UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e); // never happen
+        }
+    }
+
+    /** convert from obj to string */
+    public String toString(T value) {
+        if (value == null)
+            return "NULL";
+        else
+            return value.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
new file mode 100644
index 0000000..455a9d6
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.serializer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
+
+    // avoid mass object creation
+    DoubleWritable current = new DoubleWritable();
+
+    @Override
+    public void serialize(DoubleWritable value, ByteBuffer out) {
+        out.putDouble(value.get());
+    }
+
+    @Override
+    public DoubleWritable deserialize(ByteBuffer in) {
+        current.set(in.getDouble());
+        return current;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 8;
+    }
+
+    @Override
+    public DoubleWritable valueOf(byte[] value) {
+        if (value == null)
+            current.set(0d);
+        else
+            current.set(Double.parseDouble(Bytes.toString(value)));
+        return current;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
new file mode 100644
index 0000000..dafe661
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.serializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
+
+    HyperLogLogPlusCounter current;
+
+    public HLLCSerializer(int p) {
+        current = new HyperLogLogPlusCounter(p);
+    }
+
+    @Override
+    public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
+        try {
+            value.writeRegisters(out);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
+        try {
+            current.readRegisters(in);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return current;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return current.peekLength(in);
+    }
+
+    @Override
+    public HyperLogLogPlusCounter valueOf(byte[] value) {
+        current.clear();
+        if (value == null)
+            current.add("__nUlL__");
+        else
+            current.add(value);
+        return current;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
new file mode 100644
index 0000000..5916e58
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.serializer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class LongSerializer extends DataTypeSerializer<LongWritable> {
+
+    // avoid mass object creation
+    LongWritable current = new LongWritable();
+
+    @Override
+    public void serialize(LongWritable value, ByteBuffer out) {
+        BytesUtil.writeVLong(value.get(), out);
+    }
+
+    @Override
+    public LongWritable deserialize(ByteBuffer in) {
+        current.set(BytesUtil.readVLong(in));
+        return current;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+        
+        BytesUtil.readVLong(in);
+        int len = in.position() - mark;
+        
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public LongWritable valueOf(byte[] value) {
+        if (value == null)
+            current.set(0L);
+        else
+            current.set(Long.parseLong(Bytes.toString(value)));
+        return current;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java
new file mode 100644
index 0000000..56725c0
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java
@@ -0,0 +1,30 @@
+package org.apache.kylin.metadata.serializer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+public class StringSerializer extends DataTypeSerializer<String> {
+
+    @Override
+    public void serialize(String value, ByteBuffer out) {
+        BytesUtil.writeUTFString(value, out);
+    }
+
+    @Override
+    public String deserialize(ByteBuffer in) {
+        return BytesUtil.readUTFString(in);
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return BytesUtil.peekByteArrayLength(in);
+    }
+
+    @Override
+    public String valueOf(byte[] value) {
+        return Bytes.toString(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/metadata/src/main/java/org/apache/kylin/metadata/tuple/ICodeSystem.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ICodeSystem.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ICodeSystem.java
deleted file mode 100644
index b5bc025..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ICodeSystem.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.kylin.metadata.tuple;
-
-import java.nio.ByteBuffer;
-
-/**
- * Decides how constant values are coded and compared.
- * 
- * TupleFilter are involved in both query engine and coprocessor. In query engine, the values are strings.
- * In coprocessor, the values are dictionary IDs.
- * 
- * The type parameter is java type of code, which should be bytes. However some legacy implementation
- * stores code as String.
- * 
- * @author yangli9
- */
-public interface ICodeSystem<T> {
-    
-    T encode(Object value);
-    
-    Object decode(T code);
-
-    boolean isNull(T code);
-
-    int compare(T code1, T code2);
-
-    void serialize(T code, ByteBuffer buffer);
-
-    T deserialize(ByteBuffer code);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
new file mode 100644
index 0000000..1a14a14
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -0,0 +1,144 @@
+package org.apache.kylin.storage.gridtable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+
+import com.google.common.collect.Maps;
+
+public class GTAggregateScanner implements IGTScanner {
+
+    final GTInfo info;
+    final BitSet dimensions;
+    final BitSet metrics;
+    final TupleFilter filter;
+    final GTRawScanner rawScanner;
+
+    GTAggregateScanner(GTInfo info, IGTStore store, GTRecord pkStart, GTRecord pkEndExclusive, BitSet dimensions, BitSet metrics, TupleFilter filter) {
+        if (dimensions.intersects(info.colIsMetrics) || metrics.intersects(info.colIsDimension))
+            throw new IllegalStateException();
+
+        this.info = info;
+        this.dimensions = dimensions;
+        this.metrics = metrics;
+        this.filter = filter;
+        this.rawScanner = new GTRawScanner(info, store, pkStart, pkEndExclusive, dimensions, metrics, filter);
+    }
+
+    @Override
+    public void close() throws IOException {
+        rawScanner.close();
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        AggregationCache aggrCache = new AggregationCache();
+        for (GTRecord r : rawScanner) {
+            aggrCache.aggregate(r);
+        }
+        return aggrCache.iterator();
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    class AggregationCache {
+        final SortedMap<GTRecord, MeasureAggregator[]> aggBufMap;
+
+        public AggregationCache() {
+            this.aggBufMap = Maps.newTreeMap();
+        }
+
+        void aggregate(GTRecord r) {
+            r.maskForEqualHashComp = dimensions;
+            MeasureAggregator[] aggrs = aggBufMap.get(r);
+            if (aggrs == null) {
+                aggrs = new MeasureAggregator[metrics.cardinality()];
+                for (int i = 0, col = -1; i < aggrs.length; i++) {
+                    col = metrics.nextSetBit(col + 1);
+                    aggrs[i] = info.codeSystem.newMetricsAggregator(col);
+                }
+                aggBufMap.put(r.copy(dimensions), aggrs);
+            }
+
+            for (int i = 0, col = -1; i < aggrs.length; i++) {
+                col = metrics.nextSetBit(col + 1);
+                Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].wrapAsBuffer());
+                aggrs[i].aggregate(metrics);
+            }
+        }
+
+        public Iterator<GTRecord> iterator() {
+            return new Iterator<GTRecord>() {
+                
+                Iterator<Entry<GTRecord, MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+                ByteBuffer metricsBuf = ByteBuffer.allocate(info.maxRecordLength);
+                GTRecord oneRecord = new GTRecord(info); // avoid instance creation
+
+                @Override
+                public boolean hasNext() {
+                    return it.hasNext();
+                }
+
+                @Override
+                public GTRecord next() {
+                    Entry<GTRecord, MeasureAggregator[]> entry = it.next();
+                    
+                    GTRecord dims = entry.getKey();
+                    for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
+                        oneRecord.cols[i].set(dims.cols[i]);
+                    }
+                    
+                    metricsBuf.clear();
+                    MeasureAggregator[] aggrs = entry.getValue();
+                    for (int i = 0, col = -1; i < aggrs.length; i++) {
+                        col = metrics.nextSetBit(col + 1);
+                        int pos = metricsBuf.position();
+                        info.codeSystem.encodeColumnValue(col, aggrs[i].getState(), metricsBuf);
+                        oneRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
+                    }
+                    
+                    return oneRecord;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+
+        public long getSize() {
+            return aggBufMap.size();
+        }
+
+        // ============================================================================
+        
+        transient int rowMemBytes;
+        static final int MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
+
+        public void checkMemoryUsage() {
+            // about memory calculation,
+            // http://seniorjava.wordpress.com/2013/09/01/java-objects-memory-size-reference/
+            if (rowMemBytes <= 0) {
+                if (aggBufMap.size() > 0) {
+                    rowMemBytes = 0;
+                    MeasureAggregator[] measureAggregators = aggBufMap.get(aggBufMap.firstKey());
+                    for (MeasureAggregator agg : measureAggregators) {
+                        rowMemBytes += agg.getMemBytes();
+                    }
+                }
+            }
+            int size = aggBufMap.size();
+            int memUsage = (40 + rowMemBytes) * size;
+            if (memUsage > MEMORY_USAGE_CAP) {
+                throw new RuntimeException("Kylin coprocess memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " + size + " > " + MEMORY_USAGE_CAP + ". Abord coprocessor.");
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
index 0083b86..7e305c5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
@@ -3,21 +3,21 @@ package org.apache.kylin.storage.gridtable;
 import java.io.Closeable;
 import java.io.Flushable;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
-import org.apache.kylin.storage.gridtable.GTStore.GTWriter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.storage.gridtable.IGTStore.IGTStoreWriter;
 
 public class GTBuilder implements Closeable, Flushable {
 
     final private GTInfo info;
-    final private GTWriter writer;
+    final private IGTStoreWriter writer;
     
     private GTRowBlock block;
 
-    public GTBuilder(GTInfo info, int shard, GTStore store) {
+    GTBuilder(GTInfo info, int shard, IGTStore store) {
         this.info = info;
         this.writer = store.rebuild(shard);
-        this.block = new GTRowBlock(info);
+        this.block = GTRowBlock.allocate(info);
     }
 
     public void write(GTRecord r) throws IOException {
@@ -25,11 +25,8 @@ public class GTBuilder implements Closeable, Flushable {
         if (block.isEmpty()) {
             makePrimaryKey(r, block.primaryKey);
         }
-        for (int c = 0; c < info.nColBlocks; c++) {
-            ByteBuffer cellBuf = block.cellBlocks[c];
-            for (int i = info.colBlockCuts[c], end = info.colBlockCuts[c + 1]; i < end; i++) {
-                append(cellBuf, r.cols[i]);
-            }
+        for (int i = 0; i < info.colBlocks.length; i++) {
+            r.exportColumnBlock(i, block.cellBlockBuffers[i]);
         }
         
         block.nRows++;
@@ -38,24 +35,15 @@ public class GTBuilder implements Closeable, Flushable {
         }
     }
     
-    private void makePrimaryKey(GTRecord r, ByteBuffer buf) {
-        buf.clear();
-        
-        for (int i : info.primaryKey) {
-            append(buf, r.cols[i]);
-        }
-        
-        buf.flip();
-    }
-
-    private void append(ByteBuffer buf, ByteBuffer data) {
-        buf.put(data.array(), data.arrayOffset(), data.limit());
+    private void makePrimaryKey(GTRecord r, ByteArray buf) {
+        r.exportColumns(info.primaryKey, buf);
     }
 
     @Override
     public void flush() throws IOException {
         writer.write(block);
         block.clear();
+        block.seqId++;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/storage/src/main/java/org/apache/kylin/storage/gridtable/GTCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTCodeSystem.java
new file mode 100644
index 0000000..f9405be
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTCodeSystem.java
@@ -0,0 +1,76 @@
+package org.apache.kylin.storage.gridtable;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.filter.IFilterCodeSystem;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.metadata.serializer.DataTypeSerializer;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class GTCodeSystem implements IGTCodeSystem {
+
+    final private GTInfo info;
+    final private DataTypeSerializer[] serializers;
+    final private IFilterCodeSystem<ByteArray> filterCS;
+
+    public GTCodeSystem(GTInfo info) {
+        this.info = info;
+
+        this.serializers = new DataTypeSerializer[info.nColumns];
+        for (int i = 0; i < info.nColumns; i++) {
+            this.serializers[i] = DataTypeSerializer.create(info.colTypes[i]);
+        }
+
+        this.filterCS = new IFilterCodeSystem<ByteArray>() {
+            @Override
+            public boolean isNull(ByteArray code) {
+                return (code == null || code.length() == 0);
+            }
+
+            @Override
+            public int compare(ByteArray code1, ByteArray code2) {
+                return code1.compareTo(code2);
+            }
+
+            @Override
+            public void serialize(ByteArray code, ByteBuffer buffer) {
+                BytesUtil.writeByteArray(code.array(), code.offset(), code.length(), buffer);
+            }
+
+            @Override
+            public ByteArray deserialize(ByteBuffer buffer) {
+                return new ByteArray(BytesUtil.readByteArray(buffer));
+            }
+        };
+    }
+
+    @Override
+    public int codeLength(int col, ByteBuffer buf) {
+        return serializers[col].peekLength(buf);
+    }
+
+    @Override
+    public IFilterCodeSystem<ByteArray> getFilterCodeSystem() {
+        return filterCS;
+    }
+
+    // ============================================================================
+
+    @Override
+    public MeasureAggregator<?> newMetricsAggregator(int col) {
+        return MeasureAggregator.create(info.colMetricsAggrFunc[col], info.colTypes[col].getName());
+    }
+
+    @Override
+    public void encodeColumnValue(int col, Object value, ByteBuffer buf) {
+        serializers[col].serialize(value, buf);
+    }
+
+    @Override
+    public Object decodeColumnValue(int col, ByteBuffer buf) {
+        return serializers[col].deserialize(buf);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4e290fb0/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
index 11a3083..1e2ddd0 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
@@ -1,52 +1,190 @@
 package org.apache.kylin.storage.gridtable;
 
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
 import org.apache.kylin.metadata.model.DataType;
 
 public class GTInfo {
 
+    IGTCodeSystem codeSystem;
+    int maxRecordLength = 1024; // column length can vary
+
     // column schema
     int nColumns;
-    DataType[] colType;
-    boolean[] colIsMetrics;
-    int maxRecordLength; // column length can vary
-    
+    DataType[] colTypes;
+    BitSet colIsDimension;
+    BitSet colIsMetrics;
+    BitSet colAll;
+    String[] colMetricsAggrFunc;
+
     // grid info
-    int[] primaryKey; // columns sorted and unique
-    int rowBlockSize; // 0: no row block
-    int nColBlocks;
-    int[] colBlockCuts; // [i]: the start index of i.th block; [i+1]: the exclusive end index of i.th block
-    
+    BitSet primaryKey; // columns sorted and unique
+    BitSet[] colBlocks; // at least one column block
+    int rowBlockSize; // 0: disable row block
+    boolean rowBlockIndexEnabled;
+
     // sharding & rowkey
     int nShards; // 0: no sharding
-    byte[] rowkeyPrefix;
-    
+
+    // must create from builder
+    private GTInfo() {
+    }
+
     public boolean isShardingEnabled() {
         return nShards > 0;
     }
-    
+
     public boolean isRowBlockEnabled() {
         return rowBlockSize > 0;
     }
-    
+
+    public boolean isRowBlockIndexEnabled() {
+        return rowBlockIndexEnabled && isRowBlockEnabled();
+    }
+
     void validate() {
-        if (nColBlocks != colBlockCuts.length - 1) {
+
+        if (codeSystem == null)
+            throw new IllegalStateException();
+
+        if (primaryKey.cardinality() == 0)
             throw new IllegalStateException();
+
+        validateColumns();
+        validateColumnBlocks();
+    }
+
+    private void validateColumns() {
+        colAll = new BitSet();
+        colAll.flip(0, nColumns);
+
+        // dimension and metrics must over all columns
+        if (colIsDimension.intersects(colIsMetrics))
+            throw new IllegalStateException();
+        BitSet all = (BitSet) colAll.clone();
+        all.andNot(colIsDimension);
+        all.andNot(colIsMetrics);
+        if (all.isEmpty() == false)
+            throw new IllegalStateException();
+
+        // column blocks must not overlap
+        for (int i = 0; i < colBlocks.length; i++) {
+            for (int j = i + 1; j < colBlocks.length; j++) {
+                if (colBlocks[i].intersects(colBlocks[j]))
+                    throw new IllegalStateException();
+            }
+        }
+    }
+
+    private void validateColumnBlocks() {
+        // column block must cover all columns
+        BitSet merge = new BitSet();
+        for (int i = 0; i < colBlocks.length; i++) {
+            merge.or(colBlocks[i]);
+        }
+        if (merge.equals(colAll) == false)
+            throw new IllegalStateException();
+
+        // When row block is disabled, every row is treated as a row block, row PK is same as 
+        // row block PK. Thus PK can be removed from column blocks. See <code>GTRowBlock</code>.
+        if (isRowBlockEnabled() == false) {
+            for (int i = 0; i < colBlocks.length; i++) {
+                colBlocks[i].andNot(primaryKey);
+            }
+        }
+
+        // drop empty column block
+        LinkedList<BitSet> tmp = new LinkedList<BitSet>(Arrays.asList(colBlocks));
+        Iterator<BitSet> it = tmp.iterator();
+        while (it.hasNext()) {
+            BitSet cb = it.next();
+            if (cb.isEmpty())
+                it.remove();
+        }
+        colBlocks = (BitSet[]) tmp.toArray(new BitSet[tmp.size()]);
+    }
+
+    public boolean isDimension(int i) {
+        return colIsMetrics.get(i) == false;
+    }
+
+    public boolean isMetrics(int i) {
+        return colIsMetrics.get(i);
+    }
+
+    public static class Builder {
+        final GTInfo info;
+
+        public Builder() {
+            this.info = new GTInfo();
+        }
+
+        /** required */
+        public Builder setCodeSystem(IGTCodeSystem cs) {
+            info.codeSystem = cs;
+            return this;
+        }
+
+        /** required */
+        public Builder setColumns(DataType[] colTypes, BitSet metrics) {
+            info.nColumns = colTypes.length;
+            info.colTypes = colTypes;
+            if (info.colBlocks == null) {
+                BitSet all = new BitSet();
+                all.flip(0, info.nColumns);
+                info.colBlocks = new BitSet[] { all };
+            }
+            return this;
         }
         
-        for (int i = 0; i < nColBlocks; i++) {
-            int cbStart = colBlockCuts[i], cbEnd = colBlockCuts[i + 1];
-            if (cbStart >= cbEnd)
-                throw new IllegalStateException();
-            
-            // when row block disabled, primary key is persisted on rowkey, thus shall not repeat in column block
-            if (isRowBlockEnabled() == false) {
-                for (int ii : primaryKey) {
-                    if (ii >= cbStart && ii < cbEnd)
-                        throw new IllegalStateException();
+        public Builder setMetrics(Map<Integer, String> metricsAggrFunc) {
+            info.colIsDimension = new BitSet();
+            info.colIsMetrics = new BitSet();
+            info.colMetricsAggrFunc = new String[info.nColumns];
+            for (int i = 0; i < info.nColumns; i++) {
+                if (metricsAggrFunc.containsKey(i)) {
+                    info.colIsMetrics.set(i);
+                    info.colMetricsAggrFunc[i] = metricsAggrFunc.get(i);
+                } else {
+                    info.colIsDimension.set(i);
                 }
             }
+            return this;
+        }
+
+        /** required */
+        public Builder setPrimaryKey(BitSet primaryKey) {
+            info.primaryKey = primaryKey;
+            return this;
+        }
+
+        /** optional */
+        public Builder setMaxRecordLength(int len) {
+            info.maxRecordLength = len;
+            return this;
+        }
+
+        /** optional */
+        public Builder enableColumnBlock(BitSet[] columnBlocks) {
+            info.colBlocks = columnBlocks;
+            return this;
         }
         
+        /** optional */
+        public Builder enableRowBlock(int rowBlockSize, boolean enableIndex) {
+            info.rowBlockSize = rowBlockSize;
+            info.rowBlockIndexEnabled = enableIndex;
+            return this;
+        }
+        
+        /** optional */
+        public Builder enableSharding(int nShards) {
+            info.nShards = nShards;
+            return this;
+        }
     }
-    
 }