You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:11 UTC

[03/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
new file mode 100644
index 0000000..97fe4ac
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.engine.mr.steps.RangeKeyDistributionMapper;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author ysong1
+ * 
+ */
+public class RangeKeyDistributionMapperTest {
+
+    @SuppressWarnings("rawtypes")
+    MapDriver mapDriver;
+    String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
+
+    @Before
+    public void setUp() {
+        RangeKeyDistributionMapper mapper = new RangeKeyDistributionMapper();
+        mapDriver = MapDriver.newMapDriver(mapper);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testMapperWithoutHeader() throws IOException {
+
+        Text inputKey1 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey2 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 122, 1, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey3 = new Text(new byte[] { 2, 2, 2, 2, 2, 2, 2, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey4 = new Text(new byte[] { 3, 3, 3, 3, 3, 3, 3, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey5 = new Text(new byte[] { 4, 4, 4, 4, 4, 4, 4, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey6 = new Text(new byte[] { 5, 5, 5, 5, 5, 5, 5, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey7 = new Text(new byte[] { 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+
+        mapDriver.addInput(inputKey1, new Text("abc"));
+        mapDriver.addInput(inputKey2, new Text("abc"));
+        mapDriver.addInput(inputKey3, new Text("abc"));
+        mapDriver.addInput(inputKey4, new Text("abc"));
+        mapDriver.addInput(inputKey5, new Text("abc"));
+        mapDriver.addInput(inputKey6, new Text("abc"));
+        mapDriver.addInput(inputKey7, new Text("abc"));
+
+        List<Pair<Text, LongWritable>> result = mapDriver.run();
+
+        assertEquals(1, result.size());
+
+        byte[] key1 = result.get(0).getFirst().getBytes();
+        LongWritable value1 = result.get(0).getSecond();
+        assertArrayEquals(new byte[] { 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 }, key1);
+        assertEquals(147, value1.get());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testMapperWithHeader() throws IOException {
+
+        Text inputKey1 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey2 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 0, 0, 0, 0, 0, 0, 0, 127, 11, 122, 1, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey3 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 2, 2, 2, 2, 2, 2, 2, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey4 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 3, 3, 3, 3, 3, 3, 3, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey5 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 4, 4, 4, 4, 4, 4, 4, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey6 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 5, 5, 5, 5, 5, 5, 5, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+        Text inputKey7 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
+
+        mapDriver.addInput(inputKey1, new Text("abc"));
+        mapDriver.addInput(inputKey2, new Text("abc"));
+        mapDriver.addInput(inputKey3, new Text("abc"));
+        mapDriver.addInput(inputKey4, new Text("abc"));
+        mapDriver.addInput(inputKey5, new Text("abc"));
+        mapDriver.addInput(inputKey6, new Text("abc"));
+        mapDriver.addInput(inputKey7, new Text("abc"));
+
+        List<Pair<Text, LongWritable>> result = mapDriver.run();
+
+        assertEquals(1, result.size());
+
+        byte[] key1 = result.get(0).getFirst().getBytes();
+        LongWritable value1 = result.get(0).getSecond();
+        assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 }, key1);
+        assertEquals(273, value1.get());
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
new file mode 100644
index 0000000..cbf0657
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.apache.kylin.engine.mr.steps.RangeKeyDistributionReducer;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author ysong1
+ * 
+ */
+public class RangeKeyDistributionReducerTest {
+
+    ReduceDriver<Text, LongWritable, Text, LongWritable> reduceDriver;
+    String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
+
+    @Before
+    public void setUp() {
+        RangeKeyDistributionReducer reducer = new RangeKeyDistributionReducer();
+        reduceDriver = ReduceDriver.newReduceDriver(reducer);
+    }
+
+    @Test
+    public void testReducer() throws IOException {
+        // TODO
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/pom.xml
----------------------------------------------------------------------
diff --git a/storage/pom.xml b/storage/pom.xml
index e557363..efc1a4b 100644
--- a/storage/pom.xml
+++ b/storage/pom.xml
@@ -40,11 +40,6 @@
             <artifactId>kylin-storage-hbase</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
-        <dependency>
-            <groupId>net.sf.ehcache</groupId>
-            <artifactId>ehcache</artifactId>
-            <version>2.8.1</version>
-        </dependency>
 
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
deleted file mode 100644
index 179202e..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.kylin.storage;
-
-import com.google.common.collect.Range;
-
-/**
- */
-public interface ICachableStorageQuery extends IStorageQuery {
-    /**
-     *
-     * being dynamic => getVolatilePeriod() return not null
-     * being dynamic => partition column of its realization not null
-     *
-     * @return true for static storage like cubes
-     *          false for dynamic storage like II
-     */
-    boolean isDynamic();
-
-    /**
-     * volatile period is the period of time in which the returned data is not stable
-     * e.g. inverted index's last several minutes' data is dynamic as time goes by.
-     * data in this period cannot be cached
-     *
-     * This method should not be called before ITupleIterator.close() is called
-     *
-     * @return null if the underlying storage guarantees the data is static
-     */
-    Range<Long> getVolatilePeriod();
-
-    /**
-     * get the uuid for the realization that serves this query
-     */
-    String getStorageUUID();
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/IStorage.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/IStorage.java b/storage/src/main/java/org/apache/kylin/storage/IStorage.java
deleted file mode 100644
index 89b96e9..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/IStorage.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.metadata.realization.IRealization;
-
-public interface IStorage {
-    
-    public IStorageQuery createStorageQuery(IRealization realization);
-
-    public <I> I adaptToBuildEngine(Class<I> engineInterface);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
deleted file mode 100644
index f090ebb..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.tuple.TupleInfo;
-
-/**
- * 
- * @author xjiang
- * 
- */
-public interface IStorageQuery {
-
-    ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo);
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
deleted file mode 100644
index caa2439..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-
-/**
- * @author xjiang
- */
-public class StorageContext {
-
-    public static final int HARD_THRESHOLD = 4000000;
-    public static final int DEFAULT_THRESHOLD = 1000000;
-
-    public enum OrderEnum {
-        ASCENDING, DESCENDING
-    }
-
-    private String connUrl;
-    private int threshold;
-    private int limit;
-    private boolean hasSort;
-    private List<MeasureDesc> sortMeasures;
-    private List<OrderEnum> sortOrders;
-    private boolean acceptPartialResult;
-
-    private boolean exactAggregation;
-    private boolean enableLimit;
-    private boolean enableCoprocessor;
-
-    private long totalScanCount;
-    private Cuboid cuboid;
-    private boolean partialResultReturned;
-
-    public StorageContext() {
-        this.threshold = DEFAULT_THRESHOLD;
-        this.limit = DEFAULT_THRESHOLD;
-        this.totalScanCount = 0;
-        this.cuboid = null;
-        this.hasSort = false;
-        this.sortOrders = new ArrayList<OrderEnum>();
-        this.sortMeasures = new ArrayList<MeasureDesc>();
-
-        this.exactAggregation = false;
-        this.enableLimit = false;
-        this.enableCoprocessor = false;
-
-        this.acceptPartialResult = false;
-        this.partialResultReturned = false;
-    }
-
-    public String getConnUrl() {
-        return connUrl;
-    }
-
-    public void setConnUrl(String connUrl) {
-        this.connUrl = connUrl;
-    }
-
-    public int getThreshold() {
-        return threshold;
-    }
-
-    public void setThreshold(int t) {
-        threshold = Math.min(t, HARD_THRESHOLD);
-    }
-
-    public int getLimit() {
-        return limit;
-    }
-
-    public void setLimit(int l) {
-        this.limit = l;
-    }
-
-    public void enableLimit() {
-        this.enableLimit = true;
-    }
-
-    public boolean isLimitEnabled() {
-        return this.enableLimit;
-    }
-
-    public void addSort(MeasureDesc measure, OrderEnum order) {
-        if (measure != null) {
-            sortMeasures.add(measure);
-            sortOrders.add(order);
-        }
-    }
-
-    public void markSort() {
-        this.hasSort = true;
-    }
-
-    public boolean hasSort() {
-        return this.hasSort;
-    }
-
-    public void setCuboid(Cuboid c) {
-        cuboid = c;
-    }
-
-    public Cuboid getCuboid() {
-        return cuboid;
-    }
-
-    public long getTotalScanCount() {
-        return totalScanCount;
-    }
-
-    public void setTotalScanCount(long totalScanCount) {
-        this.totalScanCount = totalScanCount;
-    }
-
-    public boolean isAcceptPartialResult() {
-        return acceptPartialResult;
-    }
-
-    public void setAcceptPartialResult(boolean acceptPartialResult) {
-        this.acceptPartialResult = acceptPartialResult;
-    }
-
-    public boolean isPartialResultReturned() {
-        return partialResultReturned;
-    }
-
-    public void setPartialResultReturned(boolean partialResultReturned) {
-        this.partialResultReturned = partialResultReturned;
-    }
-
-    public void setExactAggregation(boolean isExactAggregation) {
-        this.exactAggregation = isExactAggregation;
-    }
-
-    public boolean isExactAggregation() {
-        return this.exactAggregation;
-    }
-
-    public void enableCoprocessor() {
-        this.enableCoprocessor = true;
-    }
-
-    public boolean isCoprocessorEnabled() {
-        return this.enableCoprocessor;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
deleted file mode 100644
index 2e07c24..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
+++ /dev/null
@@ -1,287 +0,0 @@
-package org.apache.kylin.storage.cube;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.IGTCodeSystem;
-import org.apache.kylin.storage.gridtable.IGTComparator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by shaoshi on 3/23/15.
- * This implementation uses Dictionary to encode and decode the table; If a column doesn't have dictionary, will check
- * its data type to serialize/deserialize it;
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class CubeCodeSystem implements IGTCodeSystem {
-    private static final Logger logger = LoggerFactory.getLogger(CubeCodeSystem.class);
-
-    // ============================================================================
-
-    private GTInfo info;
-    private Map<Integer, Dictionary> dictionaryMap; // column index ==> dictionary of column
-    private Map<Integer, Integer> fixLenMap; // column index ==> fixed length of column
-    private Map<Integer, Integer> dependentMetricsMap;
-    private IGTComparator comparator;
-    private DataTypeSerializer[] serializers;
-
-    public CubeCodeSystem(Map<Integer, Dictionary> dictionaryMap) {
-        this(dictionaryMap, Collections.<Integer, Integer>emptyMap(), Collections.<Integer, Integer>emptyMap());
-    }
-            
-    public CubeCodeSystem(Map<Integer, Dictionary> dictionaryMap, Map<Integer, Integer> fixLenMap, Map<Integer, Integer> dependentMetricsMap) {
-        this.dictionaryMap = dictionaryMap;
-        this.fixLenMap = fixLenMap;
-        this.dependentMetricsMap = dependentMetricsMap;
-    }
-
-    @Override
-    public void init(GTInfo info) {
-        this.info = info;
-
-        serializers = new DataTypeSerializer[info.getColumnCount()];
-        for (int i = 0; i < info.getColumnCount(); i++) {
-            // dimension with dictionary
-            if (dictionaryMap.get(i) != null) {
-                serializers[i] = new DictionarySerializer(dictionaryMap.get(i));
-            }
-            // dimension of fixed length
-            else if (fixLenMap.get(i) != null) {
-                serializers[i] = new FixLenSerializer(fixLenMap.get(i));
-            }
-            // metrics
-            else {
-                serializers[i] = DataTypeSerializer.create(info.getColumnType(i));
-            }
-        }
-
-        this.comparator = new IGTComparator() {
-            @Override
-            public boolean isNull(ByteArray code) {
-                // all 0xff is null
-                byte[] array = code.array();
-                for (int i = 0, j = code.offset(), n = code.length(); i < n; i++, j++) {
-                    if (array[j] != Dictionary.NULL)
-                        return false;
-                }
-                return true;
-            }
-
-            @Override
-            public int compare(ByteArray code1, ByteArray code2) {
-                return code1.compareTo(code2);
-            }
-        };
-    }
-
-    @Override
-    public IGTComparator getComparator() {
-        return comparator;
-    }
-
-    @Override
-    public int codeLength(int col, ByteBuffer buf) {
-        return serializers[col].peekLength(buf);
-    }
-
-    @Override
-    public int maxCodeLength(int col) {
-        return serializers[col].maxLength();
-    }
-
-    @Override
-    public void encodeColumnValue(int col, Object value, ByteBuffer buf) {
-        encodeColumnValue(col, value, 0, buf);
-    }
-
-    @Override
-    public void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) {
-        // this is a bit too complicated, but encoding only happens once at build time, so it is OK
-        DataTypeSerializer serializer = serializers[col];
-        try {
-            if (serializer instanceof DictionarySerializer) {
-                ((DictionarySerializer) serializer).serializeWithRounding(value, roundingFlag, buf);
-            } else {
-                serializer.serialize(value, buf);
-            }
-        } catch (ClassCastException ex) {
-            // try convert string into a correct object type
-            try {
-                if (value instanceof String) {
-                    Object converted = serializer.valueOf((String) value);
-                    if ((converted instanceof String) == false) {
-                        encodeColumnValue(col, converted, roundingFlag, buf);
-                        return;
-                    }
-                }
-            } catch (Throwable e) {
-                logger.error("Fail to encode value '" + value + "'", e);
-            }
-            throw ex;
-        }
-    }
-
-    @Override
-    public Object decodeColumnValue(int col, ByteBuffer buf) {
-       return serializers[col].deserialize(buf);
-    }
-
-    @Override
-    public MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions) {
-        assert columns.trueBitCount() == aggrFunctions.length;
-        
-        MeasureAggregator<?>[] result = new MeasureAggregator[aggrFunctions.length];
-        for (int i = 0; i < result.length; i++) {
-            int col = columns.trueBitAt(i);
-            result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col).toString());
-        }
-        
-        // deal with holistic distinct count
-        if (dependentMetricsMap != null) {
-            for (Integer child : dependentMetricsMap.keySet()) {
-                if (columns.get(child)) {
-                    Integer parent = dependentMetricsMap.get(child);
-                    if (columns.get(parent) == false)
-                        throw new IllegalStateException();
-                    
-                    int childIdx = columns.trueBitIndexOf(child);
-                    int parentIdx = columns.trueBitIndexOf(parent);
-                    result[childIdx].setDependentAggregator(result[parentIdx]);
-                }
-            }
-        }
-        
-        return result;
-    }
-
-    static class DictionarySerializer extends DataTypeSerializer {
-        private Dictionary dictionary;
-
-        DictionarySerializer(Dictionary dictionary) {
-            this.dictionary = dictionary;
-        }
-
-        public void serializeWithRounding(Object value, int roundingFlag, ByteBuffer buf) {
-            int id = dictionary.getIdFromValue(value, roundingFlag);
-            BytesUtil.writeUnsigned(id, dictionary.getSizeOfId(), buf);
-        }
-
-        @Override
-        public void serialize(Object value, ByteBuffer buf) {
-            int id = dictionary.getIdFromValue(value);
-            BytesUtil.writeUnsigned(id, dictionary.getSizeOfId(), buf);
-        }
-
-        @Override
-        public Object deserialize(ByteBuffer in) {
-            int id = BytesUtil.readUnsigned(in, dictionary.getSizeOfId());
-            return dictionary.getValueFromId(id);
-        }
-
-        @Override
-        public int peekLength(ByteBuffer in) {
-            return dictionary.getSizeOfId();
-        }
-
-        @Override
-        public int maxLength() {
-            return dictionary.getSizeOfId();
-        }
-
-        @Override
-        public Object valueOf(byte[] value) {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    static class FixLenSerializer extends DataTypeSerializer {
-
-        // be thread-safe and avoid repeated obj creation
-        private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
-
-        private int fixLen;
-
-        FixLenSerializer(int fixLen) {
-            this.fixLen = fixLen;
-        }
-        
-        private byte[] currentBuf() {
-            byte[] buf = current.get();
-            if (buf == null) {
-                buf = new byte[fixLen];
-                current.set(buf);
-            }
-            return buf;
-        }
-
-        @Override
-        public void serialize(Object value, ByteBuffer out) {
-            byte[] buf = currentBuf();
-            if (value == null) {
-                Arrays.fill(buf, Dictionary.NULL);
-                out.put(buf);
-            } else {
-                byte[] bytes = Bytes.toBytes(value.toString());
-                if (bytes.length > fixLen) {
-                    throw new IllegalStateException("Expect at most " + fixLen + " bytes, but got " + bytes.length + ", value string: " + value.toString());
-                }
-                out.put(bytes);
-                for (int i = bytes.length; i < fixLen; i++) {
-                    out.put(RowConstants.ROWKEY_PLACE_HOLDER_BYTE);
-                }
-            }
-        }
-
-        @Override
-        public Object deserialize(ByteBuffer in) {
-            byte[] buf = currentBuf();
-            in.get(buf);
-
-            int tail = fixLen;
-            while (tail > 0 && (buf[tail - 1] == RowConstants.ROWKEY_PLACE_HOLDER_BYTE || buf[tail - 1] == Dictionary.NULL)) {
-                tail--;
-            }
-            
-            if (tail == 0) {
-                return buf[0] == Dictionary.NULL ? null : "";
-            }
-            
-            return Bytes.toString(buf, 0, tail);
-        }
-
-        @Override
-        public int peekLength(ByteBuffer in) {
-            return fixLen;
-        }
-
-        @Override
-        public int maxLength() {
-            return fixLen;
-        }
-
-        @Override
-        public Object valueOf(byte[] value) {
-            try {
-                return new String(value, "UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                // does not happen
-                throw new RuntimeException(e);
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
deleted file mode 100644
index f792402..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package org.apache.kylin.storage.cube;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTInfo;
-
-import com.google.common.collect.Maps;
-
-@SuppressWarnings("rawtypes")
-public class CubeGridTable {
-
-    public static Map<TblColRef, Dictionary<?>> getDimensionToDictionaryMap(CubeSegment cubeSeg, long cuboidId) {
-        CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-        CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-
-        // build a dictionary map
-        Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
-        List<TblColRef> dimCols = Cuboid.findById(cubeDesc, cuboidId).getColumns();
-        for (TblColRef col : dimCols) {
-            Dictionary<?> dictionary = cubeMgr.getDictionary(cubeSeg, col);
-            if (dictionary != null) {
-                dictionaryMap.put(col, dictionary);
-            }
-        }
-        return dictionaryMap;
-    }
-
-    public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) {
-        Map<TblColRef, Dictionary<?>> dictionaryMap = getDimensionToDictionaryMap(cubeSeg, cuboidId);
-        return newGTInfo(cubeSeg.getCubeDesc(), cuboidId, dictionaryMap);
-    }
-    
-    public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<?>> dictionaryMap) {
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
-        CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid);
-        
-        Map<Integer, Dictionary> dictionaryByColIdx = Maps.newHashMap();
-        Map<Integer, Integer> fixLenByColIdx = Maps.newHashMap();
-
-        for (TblColRef dim : cuboid.getColumns()) {
-            int colIndex = mapping.getIndexOf(dim);
-            if (cubeDesc.getRowkey().isUseDictionary(dim)) {
-                Dictionary dict = dictionaryMap.get(dim);
-                dictionaryByColIdx.put(colIndex, dict);
-            } else {
-                int len = cubeDesc.getRowkey().getColumnLength(dim);
-                if (len == 0)
-                    throw new IllegalStateException();
-                
-                fixLenByColIdx.put(colIndex,  len);
-            }
-        }
-
-        GTInfo.Builder builder = GTInfo.builder();
-        builder.setTableName("Cuboid " + cuboidId);
-        builder.setCodeSystem(new CubeCodeSystem(dictionaryByColIdx, fixLenByColIdx, mapping.getDependentMetricsMap()));
-        builder.setColumns(mapping.getDataTypes());
-        builder.setPrimaryKey(mapping.getPrimaryKey());
-        builder.enableColumnBlock(mapping.getColumnBlocks());
-        return builder.build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
index e4b8657..8001fbd 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
@@ -21,12 +21,12 @@ import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.cube.model.HBaseMappingDesc;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTRowBlock;
-import org.apache.kylin.storage.gridtable.GTRowBlock.Writer;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.IGTStore;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTRowBlock;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.gridtable.GTRowBlock.Writer;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
index 038df9d..6af83d1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
@@ -11,17 +11,19 @@ import java.util.Set;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRawScanner;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.GTScanRangePlanner;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTUtil;
+import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRawScanner;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRange;
-import org.apache.kylin.storage.gridtable.GTScanRangePlanner;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.GTUtil;
-import org.apache.kylin.storage.gridtable.IGTScanner;
 
 import com.google.common.collect.Lists;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java
index 7e64975..0657a69 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeTupleConverter.java
@@ -29,10 +29,10 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.tuple.Tuple;
 import org.apache.kylin.storage.tuple.TupleInfo;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java b/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
deleted file mode 100644
index 0bf4573..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
+++ /dev/null
@@ -1,172 +0,0 @@
-package org.apache.kylin.storage.cube;
-
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class CuboidToGridTableMapping {
-
-    final private Cuboid cuboid;
-
-    private List<DataType> gtDataTypes;
-    private List<ImmutableBitSet> gtColBlocks;
-
-    private int nDimensions;
-    private Map<TblColRef, Integer> dim2gt;
-    private ImmutableBitSet gtPrimaryKey;
-
-    private int nMetrics;
-    private ListMultimap<FunctionDesc, Integer> metrics2gt; // because count distinct may have a holistic version
-
-    public CuboidToGridTableMapping(Cuboid cuboid) {
-        this.cuboid = cuboid;
-        init();
-    }
-    
-    private void init() {
-        int gtColIdx = 0;
-        gtDataTypes = Lists.newArrayList();
-        gtColBlocks = Lists.newArrayList();
-
-        // dimensions
-        dim2gt = Maps.newHashMap();
-        BitSet pk = new BitSet();
-        for (TblColRef dimension : cuboid.getColumns()) {
-            gtDataTypes.add(dimension.getType());
-            dim2gt.put(dimension, gtColIdx);
-            pk.set(gtColIdx);
-            gtColIdx++;
-        }
-        gtPrimaryKey = new ImmutableBitSet(pk);
-        gtColBlocks.add(gtPrimaryKey);
-
-        nDimensions = gtColIdx;
-        assert nDimensions == cuboid.getColumns().size();
-
-        // metrics
-        metrics2gt = LinkedListMultimap.create();
-        for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
-                BitSet colBlock = new BitSet();
-                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
-                    // Count distinct & holistic count distinct are equals() but different.
-                    // Ensure the holistic version if exists is always the first.
-                    FunctionDesc func = measure.getFunction();
-                    if (func.isHolisticCountDistinct()) {
-                        List<Integer> existing = metrics2gt.removeAll(func);
-                        metrics2gt.put(func, gtColIdx);
-                        metrics2gt.putAll(func, existing);
-                    } else {
-                        metrics2gt.put(func, gtColIdx);
-                    }
-                    gtDataTypes.add(func.getReturnDataType());
-                    colBlock.set(gtColIdx);
-                    gtColIdx++;
-                }
-                gtColBlocks.add(new ImmutableBitSet(colBlock));
-            }
-        }
-        nMetrics = gtColIdx - nDimensions;
-        assert nMetrics == cuboid.getCube().getMeasures().size();
-    }
-
-    public int getColumnCount() {
-        return nDimensions + nMetrics;
-    }
-
-    public int getDimensionCount() {
-        return nDimensions;
-    }
-
-    public int getMetricsCount() {
-        return nMetrics;
-    }
-
-    public DataType[] getDataTypes() {
-        return (DataType[]) gtDataTypes.toArray(new DataType[gtDataTypes.size()]);
-    }
-
-    public ImmutableBitSet getPrimaryKey() {
-        return gtPrimaryKey;
-    }
-
-    public ImmutableBitSet[] getColumnBlocks() {
-        return (ImmutableBitSet[]) gtColBlocks.toArray(new ImmutableBitSet[gtColBlocks.size()]);
-    }
-
-    public int getIndexOf(TblColRef dimension) {
-        Integer i = dim2gt.get(dimension);
-        return i == null ? -1 : i.intValue();
-    }
-
-    public int getIndexOf(FunctionDesc metric) {
-        List<Integer> list = metrics2gt.get(metric);
-        // normal case
-        if (list.size() == 1) {
-            return list.get(0);
-        }
-        // count distinct & its holistic version
-        else if (list.size() == 2) {
-            assert metric.isCountDistinct();
-            return metric.isHolisticCountDistinct() ? list.get(0) : list.get(1);
-        }
-        // unexpected
-        else
-            return -1;
-    }
-    
-    public List<TblColRef> getCuboidDimensionsInGTOrder() {
-        return cuboid.getColumns();
-    }
-    
-    public Map<Integer, Integer> getDependentMetricsMap() {
-        Map<Integer, Integer> result = Maps.newHashMap();
-        List<MeasureDesc> measures = cuboid.getCube().getMeasures();
-        for (MeasureDesc child : measures) {
-            if (child.getDependentMeasureRef() != null) {
-                boolean ok = false;
-                for (MeasureDesc parent : measures) {
-                    if (parent.getName().equals(child.getDependentMeasureRef())) {
-                        int childIndex = getIndexOf(child.getFunction());
-                        int parentIndex = getIndexOf(parent.getFunction());
-                        result.put(childIndex, parentIndex);
-                        ok = true;
-                        break;
-                    }
-                }
-                if (!ok)
-                    throw new IllegalStateException("Cannot find dependent measure: " + child.getDependentMeasureRef());
-            }
-        }
-        return result.isEmpty() ? Collections.<Integer, Integer>emptyMap() : result;
-    }
-
-    public static MeasureDesc[] getMeasureSequenceOnGridTable(CubeDesc cube) {
-        MeasureDesc[] result = new MeasureDesc[cube.getMeasures().size()];
-        int i = 0;
-        for (HBaseColumnFamilyDesc familyDesc : cube.getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
-                for (MeasureDesc m : hbaseColDesc.getMeasures()) {
-                    result[i++] = m;
-                }
-            }
-        }
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java
index 10ff129..366e3c3 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/SequentialCubeTupleIterator.java
@@ -7,11 +7,11 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.tuple.Tuple;
 import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
deleted file mode 100644
index 25b217c..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ /dev/null
@@ -1,268 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.SortedMap;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.measure.HLLCAggregator;
-import org.apache.kylin.metadata.measure.LDCAggregator;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class GTAggregateScanner implements IGTScanner {
-
-    @SuppressWarnings("unused")
-    private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
-
-    final GTInfo info;
-    final ImmutableBitSet dimensions; // dimensions to return, can be more than group by
-    final ImmutableBitSet groupBy;
-    final ImmutableBitSet metrics;
-    final String[] metricsAggrFuncs;
-    final IGTScanner inputScanner;
-    final AggregationCache aggrCache;
-
-    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
-        if (req.hasAggregation() == false)
-            throw new IllegalStateException();
-
-        this.info = inputScanner.getInfo();
-        this.dimensions = req.getColumns().andNot(req.getAggrMetrics());
-        this.groupBy = req.getAggrGroupBy();
-        this.metrics = req.getAggrMetrics();
-        this.metricsAggrFuncs = req.getAggrMetricsFuncs();
-        this.inputScanner = inputScanner;
-        this.aggrCache = new AggregationCache();
-    }
-
-    @Override
-    public GTInfo getInfo() {
-        return info;
-    }
-
-    @Override
-    public int getScannedRowCount() {
-        return inputScanner.getScannedRowCount();
-    }
-
-    @Override
-    public int getScannedRowBlockCount() {
-        return inputScanner.getScannedRowBlockCount();
-    }
-
-    @Override
-    public void close() throws IOException {
-        inputScanner.close();
-    }
-
-    @Override
-    public Iterator<GTRecord> iterator() {
-        for (GTRecord r : inputScanner) {
-            aggrCache.aggregate(r);
-        }
-        return aggrCache.iterator();
-    }
-
-    /** return the estimate memory size of aggregation cache */
-    public long getEstimateSizeOfAggrCache() {
-        return aggrCache.esitmateMemSize();
-    }
-
-    public Object[] getTotalSumForSanityCheck() {
-        return aggrCache.calculateTotalSumSanityCheck();
-    }
-
-    class AggregationCache {
-        final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
-        final int keyLength;
-        final boolean[] compareMask;
-
-        public AggregationCache() {
-            compareMask = createCompareMask();
-            keyLength = compareMask.length;
-            aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
-                @Override
-                public int compare(byte[] o1, byte[] o2) {
-                    int result = 0;
-                    // profiler shows this check is slow
-                    // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
-                    for (int i = 0; i < keyLength; ++i) {
-                        if (compareMask[i]) {
-                            int a = (o1[i] & 0xff);
-                            int b = (o2[i] & 0xff);
-                            result = a - b;
-                            if (result == 0) {
-                                continue;
-                            } else {
-                                return result;
-                            }
-                        }
-                    }
-                    return result;
-                }
-            });
-        }
-
-        private boolean[] createCompareMask() {
-            int keyLength = 0;
-            for (int i = 0; i < dimensions.trueBitCount(); i++) {
-                int c = dimensions.trueBitAt(i);
-                int l = info.codeSystem.maxCodeLength(c);
-                keyLength += l;
-            }
-
-            boolean[] mask = new boolean[keyLength];
-            int p = 0;
-            for (int i = 0; i < dimensions.trueBitCount(); i++) {
-                int c = dimensions.trueBitAt(i);
-                int l = info.codeSystem.maxCodeLength(c);
-                boolean m = groupBy.get(c) ? true : false;
-                for (int j = 0; j < l; j++) {
-                    mask[p++] = m;
-                }
-            }
-            return mask;
-        }
-
-        private byte[] createKey(GTRecord record) {
-            byte[] result = new byte[keyLength];
-            int offset = 0;
-            for (int i = 0; i < dimensions.trueBitCount(); i++) {
-                int c = dimensions.trueBitAt(i);
-                final ByteArray byteArray = record.cols[c];
-                final int columnLength = info.codeSystem.maxCodeLength(c);
-                System.arraycopy(byteArray.array(), byteArray.offset(), result, offset, byteArray.length());
-                offset += columnLength;
-            }
-            assert offset == result.length;
-            return result;
-        }
-
-        void aggregate(GTRecord r) {
-            final byte[] key = createKey(r);
-            MeasureAggregator[] aggrs = aggBufMap.get(key);
-            if (aggrs == null) {
-                aggrs = newAggregators();
-                aggBufMap.put(key, aggrs);
-            }
-            for (int i = 0; i < aggrs.length; i++) {
-                int col = metrics.trueBitAt(i);
-                Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer());
-                aggrs[i].aggregate(metrics);
-            }
-        }
-
-        private MeasureAggregator[] newAggregators() {
-            return info.codeSystem.newMetricsAggregators(metrics, metricsAggrFuncs);
-        }
-
-        public Object[] calculateTotalSumSanityCheck() {
-            MeasureAggregator[] totalSum = newAggregators();
-
-            // skip expensive aggregation
-            for (int i = 0; i < totalSum.length; i++) {
-                if (totalSum[i] instanceof HLLCAggregator || totalSum[i] instanceof LDCAggregator)
-                    totalSum[i] = null;
-            }
-
-            for (MeasureAggregator[] entry : aggBufMap.values()) {
-                for (int i = 0; i < totalSum.length; i++) {
-                    if (totalSum[i] != null)
-                        totalSum[i].aggregate(entry[i].getState());
-                }
-            }
-            Object[] result = new Object[totalSum.length];
-            for (int i = 0; i < totalSum.length; i++) {
-                if (totalSum[i] != null)
-                    result[i] = totalSum[i].getState();
-            }
-            return result;
-        }
-
-        public long esitmateMemSize() {
-            if (aggBufMap.isEmpty())
-                return 0;
-
-            byte[] sampleKey = aggBufMap.firstKey();
-            MeasureAggregator<?>[] sampleValue = aggBufMap.get(sampleKey);
-            return estimateSizeOfAggrCache(sampleKey, sampleValue, aggBufMap.size());
-        }
-
-        public Iterator<GTRecord> iterator() {
-            return new Iterator<GTRecord>() {
-
-                final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
-
-                final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
-                final GTRecord secondRecord = new GTRecord(info);
-
-                @Override
-                public boolean hasNext() {
-                    return it.hasNext();
-                }
-
-                @Override
-                public GTRecord next() {
-                    Entry<byte[], MeasureAggregator[]> entry = it.next();
-                    create(entry.getKey(), entry.getValue());
-                    return secondRecord;
-                }
-
-                private void create(byte[] key, MeasureAggregator[] value) {
-                    int offset = 0;
-                    for (int i = 0; i < dimensions.trueBitCount(); i++) {
-                        int c = dimensions.trueBitAt(i);
-                        final int columnLength = info.codeSystem.maxCodeLength(c);
-                        secondRecord.set(c, new ByteArray(key, offset, columnLength));
-                        offset += columnLength;
-                    }
-                    metricsBuf.clear();
-                    for (int i = 0; i < value.length; i++) {
-                        int col = metrics.trueBitAt(i);
-                        int pos = metricsBuf.position();
-                        info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf);
-                        secondRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
-                    }
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
-        }
-    }
-
-    public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
-        // Aggregation cache is basically a tree map. The tree map entry overhead is
-        // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
-        // - 41~52 according to AggregationCacheMemSizeTest
-        return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
-    }
-
-    public static long estimateSizeOf(MeasureAggregator[] aggrs) {
-        // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
-        // Memory alignment to 8 bytes
-        long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
-        for (MeasureAggregator aggr : aggrs) {
-            if (aggr != null)
-                est += aggr.getMemBytesEstimate();
-        }
-        return est;
-    }
-
-    public static long estimateSizeOf(byte[] bytes) {
-        // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
-        // Memory alignment to 8 bytes
-        return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
deleted file mode 100644
index 7552ab3..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-
-import org.apache.kylin.storage.gridtable.IGTStore.IGTStoreWriter;
-
-public class GTBuilder implements Closeable, Flushable {
-
-    @SuppressWarnings("unused")
-    final private GTInfo info;
-    final private IGTStoreWriter storeWriter;
-
-    final private GTRowBlock block;
-    final private GTRowBlock.Writer blockWriter;
-
-    private int writtenRowCount;
-    private int writtenRowBlockCount;
-
-    GTBuilder(GTInfo info, int shard, IGTStore store) throws IOException {
-        this(info, shard, store, false);
-    }
-
-    GTBuilder(GTInfo info, int shard, IGTStore store, boolean append) throws IOException {
-        this.info = info;
-
-        block = GTRowBlock.allocate(info);
-        blockWriter = block.getWriter();
-        if (append) {
-            storeWriter = store.append(shard, blockWriter);
-            if (block.isFull()) {
-                blockWriter.clearForNext();
-            }
-        } else {
-            storeWriter = store.rebuild(shard);
-        }
-    }
-
-    public void write(GTRecord r) throws IOException {
-        blockWriter.append(r);
-        writtenRowCount++;
-
-        if (block.isFull()) {
-            flush();
-        }
-    }
-
-    @Override
-    public void flush() throws IOException {
-        blockWriter.readyForFlush();
-        storeWriter.write(block);
-        writtenRowBlockCount++;
-        if (block.isFull()) {
-            blockWriter.clearForNext();
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (block.isEmpty() == false) {
-            flush();
-        }
-        storeWriter.close();
-    }
-
-    public int getWrittenRowCount() {
-        return writtenRowCount;
-    }
-
-    public int getWrittenRowBlockCount() {
-        return writtenRowBlockCount;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
deleted file mode 100644
index 42b577a..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.IOException;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.storage.gridtable.diskstore.GTDiskStore;
-import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class GTComboStore implements IGTStore {
-
-    private static final Logger logger = LoggerFactory.getLogger(GTComboStore.class);
-
-    private final GTInfo gtInfo;
-
-    private void convert(IGTStore input, IGTStore output) throws IOException {
-        final IGTStoreScanner scanner = input.scan(null, null, null, null);
-        final IGTStoreWriter writer = output.rebuild(-1);
-        while (scanner.hasNext()) {
-            writer.write(scanner.next());
-        }
-    }
-
-    private GTDiskStore gtDiskStore;
-    private GTSimpleMemStore gtSimpleMemStore;
-
-    public GTComboStore(GTInfo gtInfo) {
-        this(gtInfo, true);
-    }
-
-    public GTComboStore(GTInfo gtInfo, boolean useMemStore) {
-        this.gtInfo = gtInfo;
-        if (useMemStore) {
-            this.gtSimpleMemStore = new GTSimpleMemStore(gtInfo);
-        } else {
-            this.gtDiskStore = new GTDiskStore(gtInfo);
-        }
-    }
-    
-    @Override
-    public GTInfo getInfo() {
-        return gtInfo;
-    }
-
-    private IGTStore getCurrent() {
-        if (gtSimpleMemStore != null) {
-            return gtSimpleMemStore;
-        } else {
-            return gtDiskStore;
-        }
-    }
-    
-    public long memoryUsage() {
-        if (gtSimpleMemStore != null) {
-            return gtSimpleMemStore.memoryUsage();
-        } else {
-            return gtDiskStore.memoryUsage();
-        }
-    }
-
-    public void switchToMemStore() {
-        try {
-            if (gtSimpleMemStore == null) {
-                gtSimpleMemStore = new GTSimpleMemStore(gtInfo);
-                convert(gtDiskStore, gtSimpleMemStore);
-                gtDiskStore.drop();
-                gtDiskStore = null;
-            }
-        } catch (IOException e) {
-            logger.error("fail to switch to mem store", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void switchToDiskStore() {
-        try {
-            if (gtDiskStore == null) {
-                gtDiskStore = new GTDiskStore(gtInfo);
-                convert(gtSimpleMemStore, gtDiskStore);
-                gtSimpleMemStore.drop();
-                gtSimpleMemStore = null;
-            }
-        } catch (IOException e) {
-            logger.error("fail to switch to disk store", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public IGTStoreWriter rebuild(int shard) throws IOException {
-        return getCurrent().rebuild(shard);
-    }
-
-    @Override
-    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
-        return getCurrent().append(shard, fillLast);
-    }
-
-    @Override
-    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
-        return getCurrent().scan(pkStart, pkEnd, selectedColBlocks, additionalPushDown);
-    }
-
-    public void drop() throws IOException {
-        if (gtSimpleMemStore != null) {
-            gtSimpleMemStore.drop();
-        }
-        if (gtDiskStore != null) {
-            gtDiskStore.drop();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
deleted file mode 100644
index d60e4ba..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.metadata.filter.IFilterCodeSystem;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
-
-public class GTFilterScanner implements IGTScanner {
-
-    final private IGTScanner inputScanner;
-    final private TupleFilter filter;
-    final private IEvaluatableTuple oneTuple; // avoid instance creation
-    
-    private GTRecord next = null;
-
-    public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException {
-        this.inputScanner = inputScanner;
-        this.filter = req.getFilterPushDown();
-        this.oneTuple = new IEvaluatableTuple() {
-            @Override
-            public Object getValue(TblColRef col) {
-                return next.get(col.getColumnDesc().getZeroBasedIndex());
-            }
-        };
-
-        if (TupleFilter.isEvaluableRecursively(filter) == false)
-            throw new IllegalArgumentException();
-    }
-
-    @Override
-    public GTInfo getInfo() {
-        return inputScanner.getInfo();
-    }
-
-    @Override
-    public int getScannedRowCount() {
-        return inputScanner.getScannedRowCount();
-    }
-
-    @Override
-    public int getScannedRowBlockCount() {
-        return inputScanner.getScannedRowBlockCount();
-    }
-
-    @Override
-    public void close() throws IOException {
-        inputScanner.close();
-    }
-
-    @Override
-    public Iterator<GTRecord> iterator() {
-        return new Iterator<GTRecord>() {
-            
-            private Iterator<GTRecord> inputIterator = inputScanner.iterator();
-
-            @Override
-            public boolean hasNext() {
-                if (next != null)
-                    return true;
-
-                IFilterCodeSystem<ByteArray> filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator());
-
-                while (inputIterator.hasNext()) {
-                    next = inputIterator.next();
-                    if (filter != null && filter.evaluate(oneTuple, filterCodeSystem) == false) {
-                        continue;
-                    }
-                    return true;
-                }
-                next = null;
-                return false;
-            }
-
-            @Override
-            public GTRecord next() {
-                // fetch next record
-                if (next == null) {
-                    hasNext();
-                    if (next == null)
-                        throw new NoSuchElementException();
-                }
-
-                GTRecord result = next;
-                next = null;
-                return result;
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
-
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
deleted file mode 100644
index 1c69f8e..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
+++ /dev/null
@@ -1,246 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
-
-public class GTInfo {
-
-    public static Builder builder() {
-        return new Builder();
-    }
-
-    String tableName;
-    IGTCodeSystem codeSystem;
-
-    // column schema
-    int nColumns;
-    DataType[] colTypes;
-    ImmutableBitSet colAll;
-    ImmutableBitSet colPreferIndex;
-    transient TblColRef[] colRefs;
-
-    // grid info
-    ImmutableBitSet primaryKey; // order by, uniqueness is not required
-    ImmutableBitSet[] colBlocks; // primary key must be the first column block
-    ImmutableBitSet colBlocksAll;
-    int rowBlockSize; // 0: disable row block
-
-    // sharding
-    int nShards; // 0: no sharding
-
-    // must create from builder
-    private GTInfo() {
-    }
-
-    public String getTableName() {
-        return tableName;
-    }
-
-    public IGTCodeSystem getCodeSystem() {
-        return codeSystem;
-    }
-
-    public int getColumnCount() {
-        return nColumns;
-    }
-
-    public DataType getColumnType(int i) {
-        return colTypes[i];
-    }
-
-    public ImmutableBitSet getPrimaryKey() {
-        return primaryKey;
-    }
-
-    public boolean isShardingEnabled() {
-        return nShards > 0;
-    }
-
-    public boolean isRowBlockEnabled() {
-        return rowBlockSize > 0;
-    }
-
-    public int getRowBlockSize() {
-        return rowBlockSize;
-    }
-
-    public int getMaxRecordLength() {
-        return getMaxColumnLength(colAll);
-    }
-
-    public int getMaxColumnLength(ImmutableBitSet selectedCols) {
-        int result = 0;
-        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
-            int c = selectedCols.trueBitAt(i);
-            result += codeSystem.maxCodeLength(c);
-        }
-        return result;
-    }
-
-    public int getMaxColumnLength() {
-        int max = 0;
-        for (int i = 0; i < nColumns; i++)
-            max = Math.max(max, codeSystem.maxCodeLength(i));
-        return max;
-    }
-
-    public ImmutableBitSet selectColumnBlocks(ImmutableBitSet columns) {
-        if (columns == null)
-            columns = colAll;
-
-        BitSet result = new BitSet();
-        for (int i = 0; i < colBlocks.length; i++) {
-            ImmutableBitSet cb = colBlocks[i];
-            if (cb.intersects(columns)) {
-                result.set(i);
-            }
-        }
-        return new ImmutableBitSet(result);
-    }
-
-    public TblColRef colRef(int i) {
-        if (colRefs == null) {
-            colRefs = new TblColRef[nColumns];
-        }
-        if (colRefs[i] == null) {
-            colRefs[i] = GTUtil.tblColRef(i, colTypes[i].toString());
-        }
-        return colRefs[i];
-    }
-
-    public void validateColRef(TblColRef ref) {
-        TblColRef expected = colRef(ref.getColumnDesc().getZeroBasedIndex());
-        if (expected.equals(ref) == false)
-            throw new IllegalArgumentException();
-    }
-
-    void validate() {
-
-        if (codeSystem == null)
-            throw new IllegalStateException();
-
-        if (primaryKey.cardinality() == 0)
-            throw new IllegalStateException();
-
-        codeSystem.init(this);
-
-        validateColumnBlocks();
-    }
-
-    private void validateColumnBlocks() {
-        colAll = new ImmutableBitSet(0, nColumns);
-
-        if (colBlocks == null) {
-            colBlocks = new ImmutableBitSet[2];
-            colBlocks[0] = primaryKey;
-            colBlocks[1] = colAll.andNot(primaryKey);
-        }
-
-        colBlocksAll = new ImmutableBitSet(0, colBlocks.length);
-
-        if (colPreferIndex == null)
-            colPreferIndex = ImmutableBitSet.EMPTY;
-
-        // column blocks must not overlap
-        for (int i = 0; i < colBlocks.length; i++) {
-            for (int j = i + 1; j < colBlocks.length; j++) {
-                if (colBlocks[i].intersects(colBlocks[j]))
-                    throw new IllegalStateException();
-            }
-        }
-
-        // column block must cover all columns
-        ImmutableBitSet merge = ImmutableBitSet.EMPTY;
-        for (int i = 0; i < colBlocks.length; i++) {
-            merge = merge.or(colBlocks[i]);
-        }
-        if (merge.equals(colAll) == false)
-            throw new IllegalStateException();
-
-        // primary key must be the first column block
-        if (primaryKey.equals(colBlocks[0]) == false)
-            throw new IllegalStateException();
-
-        // drop empty column block
-        LinkedList<ImmutableBitSet> list = new LinkedList<ImmutableBitSet>(Arrays.asList(colBlocks));
-        Iterator<ImmutableBitSet> it = list.iterator();
-        while (it.hasNext()) {
-            ImmutableBitSet cb = it.next();
-            if (cb.isEmpty())
-                it.remove();
-        }
-        colBlocks = (ImmutableBitSet[]) list.toArray(new ImmutableBitSet[list.size()]);
-    }
-
-    public static class Builder {
-        final GTInfo info;
-
-        private Builder() {
-            this.info = new GTInfo();
-        }
-
-        /** optional */
-        public Builder setTableName(String name) {
-            info.tableName = name;
-            return this;
-        }
-
-        /** required */
-        public Builder setCodeSystem(IGTCodeSystem cs) {
-            info.codeSystem = cs;
-            return this;
-        }
-
-        /** required */
-        public Builder setColumns(DataType... colTypes) {
-            info.nColumns = colTypes.length;
-            info.colTypes = colTypes;
-            return this;
-        }
-
-        /** required */
-        public Builder setPrimaryKey(ImmutableBitSet primaryKey) {
-            info.primaryKey = primaryKey;
-            return this;
-        }
-
-        /** optional */
-        public Builder enableColumnBlock(ImmutableBitSet[] columnBlocks) {
-            info.colBlocks = new ImmutableBitSet[columnBlocks.length];
-            for (int i = 0; i < columnBlocks.length; i++) {
-                info.colBlocks[i] = columnBlocks[i];
-            }
-            return this;
-        }
-
-        /** optional */
-        public Builder enableRowBlock(int rowBlockSize) {
-            info.rowBlockSize = rowBlockSize;
-            return this;
-        }
-
-        /** optional */
-        public Builder enableSharding(int nShards) {
-            info.nShards = nShards;
-            return this;
-        }
-
-        /** optional */
-        public Builder setColumnPreferIndex(ImmutableBitSet colPreferIndex) {
-            info.colPreferIndex = colPreferIndex;
-            return this;
-        }
-
-        public GTInfo build() {
-            info.validate();
-            return info;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
deleted file mode 100644
index ceb1463..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
+++ /dev/null
@@ -1,205 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import it.uniroma3.mat.extendedset.intset.ConciseSet;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-
-/**
- * A thread-safe inverted index of row blocks in memory.
- * 
- * Note function not() must return all blocks, because index only know what block contains a value,
- * but not sure what block does not contain a value.
- * 
- * @author yangli9
- */
-public class GTInvertedIndex {
-
-    private final GTInfo info;
-    private final ImmutableBitSet colPreferIndex;
-    private final ImmutableBitSet colBlocks;
-    private final GTInvertedIndexOfColumn[] index; // for each column
-
-    private volatile int nIndexedBlocks;
-
-    public GTInvertedIndex(GTInfo info) {
-        this.info = info;
-        this.colPreferIndex = info.colPreferIndex;
-        this.colBlocks = info.selectColumnBlocks(colPreferIndex);
-
-        index = new GTInvertedIndexOfColumn[info.getColumnCount()];
-        for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
-            int c = colPreferIndex.trueBitAt(i);
-            index[c] = new GTInvertedIndexOfColumn(info.codeSystem.getComparator());
-        }
-    }
-
-    public void add(GTRowBlock block) {
-
-        @SuppressWarnings("unchecked")
-        Set<ByteArray>[] distinctValues = new Set[info.getColumnCount()];
-        for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
-            int c = colPreferIndex.trueBitAt(i);
-            distinctValues[c] = new HashSet<ByteArray>();
-        }
-
-        GTRowBlock.Reader reader = block.getReader(colBlocks);
-        GTRecord record = new GTRecord(info);
-        while (reader.hasNext()) {
-            reader.fetchNext(record);
-            for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
-                int c = colPreferIndex.trueBitAt(i);
-                distinctValues[c].add(record.get(c));
-            }
-        }
-
-        for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
-            int c = colPreferIndex.trueBitAt(i);
-            index[c].add(distinctValues[c], block.getSequenceId());
-        }
-
-        nIndexedBlocks = Math.max(nIndexedBlocks, block.seqId + 1);
-    }
-
-    public ConciseSet filter(TupleFilter filter) {
-        return filter(filter, nIndexedBlocks);
-    }
-
-    public ConciseSet filter(TupleFilter filter, int totalBlocks) {
-        // number of indexed blocks may increase as we do evaluation
-        int indexedBlocks = nIndexedBlocks;
-
-        Evaluator evaluator = new Evaluator(indexedBlocks);
-        ConciseSet r = evaluator.evaluate(filter);
-
-        // add blocks that have not been indexed
-        for (int i = indexedBlocks; i < totalBlocks; i++) {
-            r.add(i);
-        }
-
-        return r;
-    }
-
-    private class Evaluator {
-        private int indexedBlocks;
-
-        Evaluator(int indexedBlocks) {
-            this.indexedBlocks = indexedBlocks;
-        }
-
-        public ConciseSet evaluate(TupleFilter filter) {
-            if (filter == null) {
-                return all();
-            }
-
-            if (filter instanceof LogicalTupleFilter)
-                return evalLogical((LogicalTupleFilter) filter);
-
-            if (filter instanceof CompareTupleFilter)
-                return evalCompare((CompareTupleFilter) filter);
-
-            // unable to evaluate
-            return all();
-        }
-
-        @SuppressWarnings("unchecked")
-        private ConciseSet evalCompare(CompareTupleFilter filter) {
-            int col = col(filter);
-            if (index[col] == null)
-                return all();
-
-            switch (filter.getOperator()) {
-            case ISNULL:
-                return index[col].getNull();
-            case ISNOTNULL:
-                return all();
-            case EQ:
-                return index[col].getEquals((ByteArray) filter.getFirstValue());
-            case NEQ:
-                return all();
-            case IN:
-                return index[col].getIn((Iterable<ByteArray>) filter.getValues());
-            case NOTIN:
-                return all();
-            case LT:
-                return index[col].getRange(null, false, (ByteArray) filter.getFirstValue(), false);
-            case LTE:
-                return index[col].getRange(null, false, (ByteArray) filter.getFirstValue(), true);
-            case GT:
-                return index[col].getRange((ByteArray) filter.getFirstValue(), false, null, false);
-            case GTE:
-                return index[col].getRange((ByteArray) filter.getFirstValue(), true, null, false);
-            default:
-                throw new IllegalStateException("Unsupported operator " + filter.getOperator());
-            }
-        }
-
-        private ConciseSet evalLogical(LogicalTupleFilter filter) {
-            List<? extends TupleFilter> children = filter.getChildren();
-
-            switch (filter.getOperator()) {
-            case AND:
-                return evalLogicalAnd(children);
-            case OR:
-                return evalLogicalOr(children);
-            case NOT:
-                return evalLogicalNot(children);
-            default:
-                throw new IllegalStateException("Unsupported operator " + filter.getOperator());
-            }
-        }
-
-        private ConciseSet evalLogicalAnd(List<? extends TupleFilter> children) {
-            ConciseSet set = all();
-
-            for (TupleFilter c : children) {
-                ConciseSet t = evaluate(c);
-                if (t == null)
-                    continue; // because it's AND
-
-                set.retainAll(t);
-            }
-            return set;
-        }
-
-        private ConciseSet evalLogicalOr(List<? extends TupleFilter> children) {
-            ConciseSet set = new ConciseSet();
-
-            for (TupleFilter c : children) {
-                ConciseSet t = evaluate(c);
-                if (t == null)
-                    return null; // because it's OR
-
-                set.addAll(t);
-            }
-            return set;
-        }
-
-        private ConciseSet evalLogicalNot(List<? extends TupleFilter> children) {
-            return all();
-        }
-
-        private ConciseSet all() {
-            return not(new ConciseSet());
-        }
-
-        private ConciseSet not(ConciseSet set) {
-            set.add(indexedBlocks);
-            set.complement();
-            return set;
-        }
-
-        private int col(CompareTupleFilter filter) {
-            return filter.getColumn().getColumnDesc().getZeroBasedIndex();
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndexOfColumn.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndexOfColumn.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndexOfColumn.java
deleted file mode 100644
index 10a944c..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndexOfColumn.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import it.uniroma3.mat.extendedset.intset.ConciseSet;
-
-import java.util.NavigableMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.kylin.common.util.ByteArray;
-
-import com.google.common.collect.Maps;
-
-public class GTInvertedIndexOfColumn {
-
-    final private IGTComparator comparator;
-    final private ReentrantReadWriteLock rwLock;
-
-    private int nBlocks;
-    private NavigableMap<ByteArray, ConciseSet> rangeIndex;
-    private ConciseSet nullIndex;
-
-    public GTInvertedIndexOfColumn(IGTComparator comparator) {
-        this.comparator = comparator;
-        this.rwLock = new ReentrantReadWriteLock();
-        this.rangeIndex = Maps.newTreeMap(comparator);
-        this.nullIndex = new ConciseSet();
-    }
-
-    public void add(Iterable<ByteArray> codes, int blockId) {
-        rwLock.writeLock().lock();
-        try {
-            for (ByteArray code : codes) {
-                if (comparator.isNull(code)) {
-                    nullIndex.add(blockId);
-                    continue;
-                }
-                ConciseSet set = rangeIndex.get(code);
-                if (set == null) {
-                    set = new ConciseSet();
-                    rangeIndex.put(code.copy(), set);
-                }
-                set.add(blockId);
-            }
-
-            if (blockId >= nBlocks) {
-                nBlocks = blockId + 1;
-            }
-
-        } finally {
-            rwLock.writeLock().unlock();
-        }
-    }
-
-    public ConciseSet getNull() {
-        rwLock.readLock().lock();
-        try {
-            return nullIndex.clone();
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    public ConciseSet getEquals(ByteArray code) {
-        rwLock.readLock().lock();
-        try {
-            ConciseSet set = rangeIndex.get(code);
-            if (set == null)
-                return new ConciseSet();
-            else
-                return set.clone();
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    public ConciseSet getIn(Iterable<ByteArray> codes) {
-        rwLock.readLock().lock();
-        try {
-            ConciseSet r = new ConciseSet();
-            for (ByteArray code : codes) {
-                ConciseSet set = rangeIndex.get(code);
-                if (set != null)
-                    r.addAll(set);
-            }
-            return r;
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    public ConciseSet getRange(ByteArray from, boolean fromInclusive, ByteArray to, boolean toInclusive) {
-        rwLock.readLock().lock();
-        try {
-            ConciseSet r = new ConciseSet();
-            if (from == null && to == null) {
-                r.add(nBlocks);
-                r.complement();
-                return r;
-            }
-            NavigableMap<ByteArray, ConciseSet> subMap;
-            if (from == null) {
-                subMap = rangeIndex.headMap(to, toInclusive);
-            } else if (to == null) {
-                subMap = rangeIndex.tailMap(from, fromInclusive);
-            } else {
-                subMap = rangeIndex.subMap(from, fromInclusive, to, toInclusive);
-            }
-            for (ConciseSet set : subMap.values()) {
-                r.addAll(set);
-            }
-            return r;
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-}