You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 06:13:21 UTC

[31/47] incubator-kylin git commit: KYLIN-875 rename modules: core-common, core-cube, core-dictionary, core-cube

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
new file mode 100644
index 0000000..a26c2c9
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Objects;
+
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.IDictionaryAware;
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IBuildable {
+
+    @JsonBackReference
+    private CubeInstance cubeInstance;
+    @JsonProperty("uuid")
+    private String uuid;
+    @JsonProperty("name")
+    private String name;
+    @JsonProperty("storage_location_identifier")
+    private String storageLocationIdentifier; // HTable name
+    @JsonProperty("date_range_start")
+    private long dateRangeStart;
+    @JsonProperty("date_range_end")
+    private long dateRangeEnd;
+    @JsonProperty("status")
+    private SegmentStatusEnum status;
+    @JsonProperty("size_kb")
+    private long sizeKB;
+    @JsonProperty("input_records")
+    private long inputRecords;
+    @JsonProperty("input_records_size")
+    private long inputRecordsSize;
+    @JsonProperty("last_build_time")
+    private long lastBuildTime;
+    @JsonProperty("last_build_job_id")
+    private String lastBuildJobID;
+    @JsonProperty("create_time_utc")
+    private long createTimeUTC;
+
+    @JsonProperty("binary_signature")
+    private String binarySignature; // a hash of cube schema and dictionary ID, used for sanity check
+
+    @JsonProperty("dictionaries")
+    private ConcurrentHashMap<String, String> dictionaries; // table/column ==> dictionary resource path
+    @JsonProperty("snapshots")
+    private ConcurrentHashMap<String, String> snapshots; // table name ==> snapshot resource path
+
+    public CubeDesc getCubeDesc() {
+        return getCubeInstance().getDescriptor();
+    }
+
+    /**
+     * @param startDate
+     * @param endDate
+     * @return if(startDate == 0 && endDate == 0), returns "FULL_BUILD", else
+     * returns "yyyyMMddHHmmss_yyyyMMddHHmmss"
+     */
+    public static String getSegmentName(long startDate, long endDate) {
+        if (startDate == 0 && endDate == 0) {
+            return "FULL_BUILD";
+        }
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+        dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        return dateFormat.format(startDate) + "_" + dateFormat.format(endDate);
+    }
+
+    // ============================================================================
+
+    public String getUuid() {
+        return uuid;
+    }
+
+    public void setUuid(String id) {
+        this.uuid = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public long getDateRangeStart() {
+        return dateRangeStart;
+    }
+
+    public void setDateRangeStart(long dateRangeStart) {
+        this.dateRangeStart = dateRangeStart;
+    }
+
+    public long getDateRangeEnd() {
+        return dateRangeEnd;
+    }
+
+    public void setDateRangeEnd(long dateRangeEnd) {
+        this.dateRangeEnd = dateRangeEnd;
+    }
+
+    public SegmentStatusEnum getStatus() {
+        return status;
+    }
+
+    public void setStatus(SegmentStatusEnum status) {
+        this.status = status;
+    }
+
+    public long getSizeKB() {
+        return sizeKB;
+    }
+
+    public void setSizeKB(long sizeKB) {
+        this.sizeKB = sizeKB;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+    }
+
+    public long getInputRecordsSize() {
+        return inputRecordsSize;
+    }
+
+    public void setInputRecordsSize(long inputRecordsSize) {
+        this.inputRecordsSize = inputRecordsSize;
+    }
+
+    public long getLastBuildTime() {
+        return lastBuildTime;
+    }
+
+    public void setLastBuildTime(long lastBuildTime) {
+        this.lastBuildTime = lastBuildTime;
+    }
+
+    public String getLastBuildJobID() {
+        return lastBuildJobID;
+    }
+
+    public void setLastBuildJobID(String lastBuildJobID) {
+        this.lastBuildJobID = lastBuildJobID;
+    }
+
+
+    public long getCreateTimeUTC() {
+        return createTimeUTC;
+    }
+
+    public void setCreateTimeUTC(long createTimeUTC) {
+        this.createTimeUTC = createTimeUTC;
+    }
+
+    public String getBinarySignature() {
+        return binarySignature;
+    }
+
+    public void setBinarySignature(String binarySignature) {
+        this.binarySignature = binarySignature;
+    }
+
+    public CubeInstance getCubeInstance() {
+        return cubeInstance;
+    }
+
+    public void setCubeInstance(CubeInstance cubeInstance) {
+        this.cubeInstance = cubeInstance;
+    }
+
+    public String getStorageLocationIdentifier() {
+
+        return storageLocationIdentifier;
+    }
+
+    public Map<String, String> getDictionaries() {
+        if (dictionaries == null)
+            dictionaries = new ConcurrentHashMap<String, String>();
+        return dictionaries;
+    }
+
+    public Map<String, String> getSnapshots() {
+        if (snapshots == null)
+            snapshots = new ConcurrentHashMap<String, String>();
+        return snapshots;
+    }
+
+    public String getSnapshotResPath(String table) {
+        return getSnapshots().get(table);
+    }
+
+    public void putSnapshotResPath(String table, String snapshotResPath) {
+        getSnapshots().put(table, snapshotResPath);
+    }
+
+    public Collection<String> getDictionaryPaths() {
+        return getDictionaries().values();
+    }
+
+    public Collection<String> getSnapshotPaths() {
+        return getSnapshots().values();
+    }
+
+    public String getDictResPath(TblColRef col) {
+        return getDictionaries().get(dictKey(col));
+    }
+
+    public void putDictResPath(TblColRef col, String dictResPath) {
+        getDictionaries().put(dictKey(col), dictResPath);
+    }
+
+    private String dictKey(TblColRef col) {
+        return col.getTable() + "/" + col.getName();
+    }
+
+    public void setStorageLocationIdentifier(String storageLocationIdentifier) {
+        this.storageLocationIdentifier = storageLocationIdentifier;
+    }
+
+    @Override
+    public int getColumnLength(TblColRef col) {
+        Dictionary<?> dict = getDictionary(col);
+        if (dict == null) {
+            return this.getCubeDesc().getRowkey().getColumnLength(col);
+        } else {
+            return dict.getSizeOfId();
+        }
+    }
+
+    @Override
+    public Dictionary<?> getDictionary(TblColRef col) {
+        return CubeManager.getInstance(this.getCubeInstance().getConfig()).getDictionary(this, col);
+    }
+
+    public void validate() {
+        if (cubeInstance.getDescriptor().getModel().getPartitionDesc().isPartitioned() && dateRangeStart >= dateRangeEnd)
+            throw new IllegalStateException("dateRangeStart(" + dateRangeStart + ") must be smaller than dateRangeEnd(" + dateRangeEnd + ") in segment " + this);
+    }
+
+    @Override
+    public int compareTo(CubeSegment other) {
+        long comp = this.dateRangeStart - other.dateRangeStart;
+        if (comp != 0)
+            return comp < 0 ? -1 : 1;
+
+        comp = this.dateRangeEnd - other.dateRangeEnd;
+        if (comp != 0)
+            return comp < 0 ? -1 : 1;
+        else
+            return 0;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((cubeInstance == null) ? 0 : cubeInstance.hashCode());
+        result = prime * result + ((name == null) ? 0 : name.hashCode());
+        result = prime * result + ((status == null) ? 0 : status.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        CubeSegment other = (CubeSegment) obj;
+        if (cubeInstance == null) {
+            if (other.cubeInstance != null)
+                return false;
+        } else if (!cubeInstance.equals(other.cubeInstance))
+            return false;
+        if (uuid == null) {
+            if (other.uuid != null)
+                return false;
+        } else if (!uuid.equals(other.uuid))
+            return false;
+        if (name == null) {
+            if (other.name != null)
+                return false;
+        } else if (!name.equals(other.name))
+            return false;
+        if (status != other.status)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this).add("uuid", uuid).add("create_time_utc:", createTimeUTC).add("name", name).add("last_build_job_id", lastBuildJobID).add("status", status).toString();
+    }
+
+
+    public void setDictionaries(ConcurrentHashMap<String, String> dictionaries) {
+        this.dictionaries = dictionaries;
+    }
+
+    public void setSnapshots(ConcurrentHashMap<String, String> snapshots) {
+        this.snapshots = snapshots;
+    }
+
+    public String getStatisticsResourcePath() {
+        return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid());
+    }
+
+    public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) {
+        return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + ".seq";
+    }
+
+    @Override
+    public int getSourceType() {
+        return 0;
+    }
+
+    @Override
+    public int getEngineType() {
+        return 0;
+    }
+
+    @Override
+    public int getStorageType() {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
new file mode 100644
index 0000000..dffaa48
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube;
+
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+
+/**
+ * Hold changes to a cube so that they can be applied as one unit.
+ */
+public class CubeUpdate {
+    private CubeInstance cubeInstance;
+    private CubeSegment[] toAddSegs = null;
+    private CubeSegment[] toRemoveSegs = null;
+    private CubeSegment[] toUpdateSegs = null;
+    private RealizationStatusEnum status;
+    private String owner;
+    private int cost = -1;
+
+    public CubeUpdate(CubeInstance cubeInstance) {
+        this.cubeInstance = cubeInstance;
+    }
+
+    public CubeInstance getCubeInstance() {
+        return cubeInstance;
+    }
+
+    public CubeUpdate setCubeInstance(CubeInstance cubeInstance) {
+        this.cubeInstance = cubeInstance;
+        return this;
+    }
+
+    public CubeSegment[] getToAddSegs() {
+        return toAddSegs;
+    }
+
+    public CubeUpdate setToAddSegs(CubeSegment... toAddSegs) {
+        this.toAddSegs = toAddSegs;
+        return this;
+    }
+
+    public CubeSegment[] getToRemoveSegs() {
+        return toRemoveSegs;
+    }
+
+    public CubeUpdate setToRemoveSegs(CubeSegment... toRemoveSegs) {
+        this.toRemoveSegs = toRemoveSegs;
+        return this;
+    }
+
+    public CubeSegment[] getToUpdateSegs() {
+        return toUpdateSegs;
+    }
+
+    public CubeUpdate setToUpdateSegs(CubeSegment... toUpdateSegs) {
+        this.toUpdateSegs = toUpdateSegs;
+        return this;
+    }
+
+    public RealizationStatusEnum getStatus() {
+        return status;
+    }
+
+    public CubeUpdate setStatus(RealizationStatusEnum status) {
+        this.status = status;
+        return this;
+    }
+
+    public String getOwner() {
+        return owner;
+    }
+
+    public CubeUpdate setOwner(String owner) {
+        this.owner = owner;
+        return this;
+    }
+
+    public int getCost() {
+        return cost;
+    }
+
+    public CubeUpdate setCost(int cost) {
+        this.cost = cost;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
new file mode 100644
index 0000000..feb0f1a
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cli;
+
+import java.io.IOException;
+
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class DictionaryGeneratorCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
+
+    public static void processSegment(KylinConfig config, String cubeName, String segmentName, String factColumnsPath) throws IOException {
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+        processSegment(config, segment, factColumnsPath);
+    }
+
+    private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String factColumnsPath) throws IOException {
+        CubeManager cubeMgr = CubeManager.getInstance(config);
+
+        for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
+            // dictionary
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
+                    logger.info("Building dictionary for " + col);
+                    cubeMgr.buildDictionary(cubeSeg, col, factColumnsPath);
+                }
+            }
+
+            // build snapshot
+            if (dim.getTable() != null && !dim.getTable().equalsIgnoreCase(cubeSeg.getCubeDesc().getFactTable())) {
+                // CubeSegment seg = cube.getTheOnlySegment();
+                logger.info("Building snapshot of " + dim.getTable());
+                cubeMgr.buildSnapshotTable(cubeSeg, dim.getTable());
+                logger.info("Checking snapshot of " + dim.getTable());
+                cubeMgr.getLookupTable(cubeSeg, dim); // load the table for
+                                                      // sanity check
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
new file mode 100644
index 0000000..24daddf
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cli;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
+
+public class DumpDictionaryCLI {
+
+    public static void main(String[] args) throws IOException {
+        for (String path : args) {
+            dump(new File(path));
+        }
+    }
+
+    public static void dump(File f) throws IOException {
+        if (f.isDirectory()) {
+            for (File c : f.listFiles())
+                dump(c);
+            return;
+        }
+
+        if (f.getName().endsWith(".dict")) {
+            DictionaryInfoSerializer ser = new DictionaryInfoSerializer();
+            DictionaryInfo dictInfo = ser.deserialize(new DataInputStream(new FileInputStream(f)));
+
+            System.out.println("============================================================================");
+            System.out.println("File: " + f.getAbsolutePath());
+            System.out.println(new Date(dictInfo.getLastModified()));
+            System.out.println(JsonUtil.writeValueAsIndentString(dictInfo));
+            dictInfo.getDictionaryObject().dump(System.out);
+            System.out.println();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
new file mode 100644
index 0000000..48db0d8
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.common.util.Bytes;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyColumnIO;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+public class RowKeySplitter {
+
+    private CubeDesc cubeDesc;
+    private RowKeyColumnIO colIO;
+
+    private SplittedBytes[] splitBuffers;
+    private int bufferSize;
+
+    public SplittedBytes[] getSplitBuffers() {
+        return splitBuffers;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) {
+        this.cubeDesc = cubeSeg.getCubeDesc();
+        this.colIO = new RowKeyColumnIO(cubeSeg);
+
+        this.splitBuffers = new SplittedBytes[splitLen];
+        for (int i = 0; i < splitLen; i++) {
+            this.splitBuffers[i] = new SplittedBytes(bytesLen);
+        }
+        this.bufferSize = 0;
+    }
+
+    /**
+     * @param bytes
+     * @param byteLen
+     * @return cuboid ID
+     */
+    public long split(byte[] bytes, int byteLen) {
+        this.bufferSize = 0;
+        int offset = 0;
+
+        // extract cuboid id
+        SplittedBytes cuboidIdSplit = this.splitBuffers[this.bufferSize++];
+        cuboidIdSplit.length = RowConstants.ROWKEY_CUBOIDID_LEN;
+        System.arraycopy(bytes, offset, cuboidIdSplit.value, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+        offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+        long cuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
+        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+
+        // rowkey columns
+        for (int i = 0; i < cuboid.getColumns().size(); i++) {
+            TblColRef col = cuboid.getColumns().get(i);
+            int colLength = colIO.getColumnLength(col);
+            SplittedBytes split = this.splitBuffers[this.bufferSize++];
+            split.length = colLength;
+            System.arraycopy(bytes, offset, split.value, 0, colLength);
+            offset += colLength;
+        }
+
+        return cuboidId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
new file mode 100644
index 0000000..499ba28
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cuboid;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.common.util.Bytes;
+
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask;
+import org.apache.kylin.cube.model.RowKeyDesc.HierarchyMask;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class Cuboid implements Comparable<Cuboid> {
+
+    private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = new ConcurrentHashMap<String, Map<Long, Cuboid>>();
+
+    public static Cuboid findById(CubeDesc cube, byte[] cuboidID) {
+        return findById(cube, Bytes.toLong(cuboidID));
+    }
+
+    public static Cuboid findById(CubeDesc cube, long cuboidID) {
+        Map<Long, Cuboid> cubeCache = CUBOID_CACHE.get(cube.getName());
+        if (cubeCache == null) {
+            cubeCache = new ConcurrentHashMap<Long, Cuboid>();
+            CUBOID_CACHE.put(cube.getName(), cubeCache);
+        }
+        Cuboid cuboid = cubeCache.get(cuboidID);
+        if (cuboid == null) {
+            long validCuboidID = translateToValidCuboid(cube, cuboidID);
+            cuboid = new Cuboid(cube, cuboidID, validCuboidID);
+            cubeCache.put(cuboidID, cuboid);
+        }
+        return cuboid;
+
+    }
+
+    public static boolean isValid(CubeDesc cube, long cuboidID) {
+        RowKeyDesc rowkey = cube.getRowkey();
+
+        if (cuboidID < 0) {
+            throw new IllegalArgumentException("Cuboid " + cuboidID + " should be greater than 0");
+        }
+
+        if (checkBaseCuboid(rowkey, cuboidID)) {
+            return true;
+        }
+
+        if (checkMandatoryColumns(rowkey, cuboidID) == false) {
+            return false;
+        }
+
+        if (checkAggregationGroup(rowkey, cuboidID) == false) {
+            return false;
+        }
+
+        if (checkHierarchy(rowkey, cuboidID) == false) {
+            return false;
+        }
+
+        return true;
+    }
+
+    public static long getBaseCuboidId(CubeDesc cube) {
+        return cube.getRowkey().getFullMask();
+    }
+
+    public static Cuboid getBaseCuboid(CubeDesc cube) {
+        return findById(cube, getBaseCuboidId(cube));
+    }
+
+    // Breadth-First-Search
+    private static long translateToValidCuboid(CubeDesc cube, long cuboidID) {
+        if (Cuboid.isValid(cube, cuboidID)) {
+            return cuboidID;
+        }
+
+        HashSet<Long> dedupped = new HashSet<Long>();
+        Queue<Long> queue = new LinkedList<Long>();
+        List<Long> parents = Cuboid.getAllPossibleParents(cube, cuboidID);
+
+        // check each parent
+        addToQueue(queue, parents, dedupped);
+        while (queue.size() > 0) {
+            long parent = pollFromQueue(queue, dedupped);
+            if (Cuboid.isValid(cube, parent)) {
+                return parent;
+            } else {
+                addToQueue(queue, Cuboid.getAllPossibleParents(cube, parent), dedupped);
+            }
+        }
+        return -1;
+    }
+
+    private static List<Long> getAllPossibleParents(CubeDesc cube, long cuboidID) {
+        List<Long> allPossibleParents = new ArrayList<Long>();
+
+        for (int i = 0; i < cube.getRowkey().getRowKeyColumns().length; i++) {
+            long mask = 1L << i;
+            long parentId = cuboidID | mask;
+            if (parentId != cuboidID) {
+                allPossibleParents.add(parentId);
+            }
+        }
+
+        return allPossibleParents;
+    }
+
+    private static void addToQueue(Queue<Long> queue, List<Long> parents, HashSet<Long> dedupped) {
+        Collections.sort(parents);
+        for (Long p : parents) {
+            if (!dedupped.contains(p)) {
+                dedupped.add(p);
+                queue.offer(p);
+            }
+        }
+    }
+
+    private static long pollFromQueue(Queue<Long> queue, HashSet<Long> dedupped) {
+        long element = queue.poll();
+        dedupped.remove(element);
+        return element;
+    }
+
+    private static boolean checkBaseCuboid(RowKeyDesc rowkey, long cuboidID) {
+        long baseCuboidId = rowkey.getFullMask();
+        if (cuboidID > baseCuboidId) {
+            throw new IllegalArgumentException("Cubiod " + cuboidID + " is out of scope 0-" + baseCuboidId);
+        }
+        return baseCuboidId == cuboidID;
+    }
+
+    private static boolean checkMandatoryColumns(RowKeyDesc rowkey, long cuboidID) {
+        long mandatoryColumnMask = rowkey.getMandatoryColumnMask();
+
+        // note the all-zero cuboid (except for mandatory) is not valid
+        if (cuboidID <= mandatoryColumnMask)
+            return false;
+
+        return (cuboidID & mandatoryColumnMask) == mandatoryColumnMask;
+    }
+
+    private static boolean checkHierarchy(RowKeyDesc rowkey, long cuboidID) {
+        List<HierarchyMask> hierarchyMaskList = rowkey.getHierarchyMasks();
+        // if no hierarchy defined in metadata
+        if (hierarchyMaskList == null || hierarchyMaskList.size() == 0) {
+            return true;
+        }
+
+        hier: for (HierarchyMask hierarchyMasks : hierarchyMaskList) {
+            long result = cuboidID & hierarchyMasks.fullMask;
+            if (result > 0) {
+                // if match one of the hierarchy constrains, return true;
+                for (long mask : hierarchyMasks.allMasks) {
+                    if (result == mask) {
+                        continue hier;
+                    }
+                }
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static boolean checkAggregationGroup(RowKeyDesc rowkey, long cuboidID) {
+        long cuboidWithoutMandatory = cuboidID & ~rowkey.getMandatoryColumnMask();
+        long leftover;
+        for (AggrGroupMask mask : rowkey.getAggrGroupMasks()) {
+            if ((cuboidWithoutMandatory & mask.uniqueMask) != 0) {
+                leftover = cuboidWithoutMandatory & ~mask.groupMask;
+                return leftover == 0 || leftover == mask.leftoverMask;
+            }
+        }
+
+        leftover = cuboidWithoutMandatory & rowkey.getTailMask();
+        return leftover == 0 || leftover == rowkey.getTailMask();
+    }
+    
+    // ============================================================================
+
+    private CubeDesc cube;
+    private final long inputID;
+    private final long id;
+    private final byte[] idBytes;
+    private final boolean requirePostAggregation;
+    private List<TblColRef> dimensionColumns;
+
+    // will translate the cuboidID if it is not valid
+    private Cuboid(CubeDesc cube, long originalID, long validID) {
+        this.cube = cube;
+        this.inputID = originalID;
+        this.id = validID;
+        this.idBytes = Bytes.toBytes(id);
+        this.dimensionColumns = translateIdToColumns(this.id);
+        this.requirePostAggregation = calcExtraAggregation(this.inputID, this.id) != 0;
+    }
+
+    private List<TblColRef> translateIdToColumns(long cuboidID) {
+        List<TblColRef> dimesnions = new ArrayList<TblColRef>();
+        RowKeyColDesc[] allColumns = cube.getRowkey().getRowKeyColumns();
+        for (int i = 0; i < allColumns.length; i++) {
+            // NOTE: the order of column in list!!!
+            long bitmask = 1L << allColumns[i].getBitIndex();
+            if ((cuboidID & bitmask) != 0) {
+                TblColRef colRef = allColumns[i].getColRef();
+                dimesnions.add(colRef);
+            }
+        }
+        return dimesnions;
+    }
+
+    private long calcExtraAggregation(long inputID, long id) {
+        long diff = id ^ inputID;
+        return eliminateHierarchyAggregation(diff);
+    }
+
+    // higher level in hierarchy can be ignored when counting aggregation columns
+    private long eliminateHierarchyAggregation(long id) {
+        List<HierarchyMask> hierarchyMaskList = cube.getRowkey().getHierarchyMasks();
+        if (hierarchyMaskList != null && hierarchyMaskList.size() > 0) {
+            for (HierarchyMask hierMask : hierarchyMaskList) {
+                long[] allMasks = hierMask.allMasks;
+                for (int i = allMasks.length - 1; i > 0; i--) {
+                    long bit = allMasks[i] ^ allMasks[i - 1];
+                    if ((inputID & bit) != 0) {
+                        id &= ~allMasks[i - 1];
+                    }
+                }
+            }
+        }
+        return id;
+    }
+
+    public CubeDesc getCube() {
+        return cube;
+    }
+
+    public List<TblColRef> getColumns() {
+        return dimensionColumns;
+    }
+
+    public List<TblColRef> getAggregationColumns() {
+        long aggrColsID = eliminateHierarchyAggregation(id);
+        return translateIdToColumns(aggrColsID);
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public byte[] getBytes() {
+        return idBytes;
+    }
+
+    public long getInputID() {
+        return inputID;
+    }
+
+    public boolean useAncestor() {
+        return inputID != id;
+    }
+
+    public boolean requirePostAggregation() {
+        return requirePostAggregation;
+    }
+
+    public static void clearCache() {
+        CUBOID_CACHE.clear();
+    }
+
+    public static void reloadCache(String cubeDescName) {
+        CUBOID_CACHE.remove(cubeDescName);
+    }
+
+    @Override
+    public String toString() {
+        return "Cuboid [id=" + id + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (int) (id ^ (id >>> 32));
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        Cuboid other = (Cuboid) obj;
+        if (id != other.id)
+            return false;
+        return true;
+    }
+
+    @Override
+    public int compareTo(Cuboid o) {
+        if (this.id < o.id) {
+            return -1;
+        } else if (this.id > o.id) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java
new file mode 100644
index 0000000..c6dc55a
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cuboid;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.TreeSet;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask;
+import org.apache.kylin.cube.model.RowKeyDesc.HierarchyMask;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class CuboidCLI {
+
+    public static void main(String[] args) throws IOException {
+        CubeDescManager cubeDescMgr = CubeDescManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+        if ("test".equals(args[0])) {
+            CubeDesc cubeDesc = cubeDescMgr.getCubeDesc(args[1]);
+            simulateCuboidGeneration(cubeDesc);
+        }
+    }
+
+    public static int simulateCuboidGeneration(CubeDesc cube) {
+        CuboidScheduler scheduler = new CuboidScheduler(cube);
+
+        long baseCuboid = Cuboid.getBaseCuboidId(cube);
+        Collection<Long> cuboidSet = new TreeSet<Long>();
+        cuboidSet.add(baseCuboid);
+        LinkedList<Long> cuboidQueue = new LinkedList<Long>();
+        cuboidQueue.push(baseCuboid);
+        while (!cuboidQueue.isEmpty()) {
+            long cuboid = cuboidQueue.pop();
+            Collection<Long> spnanningCuboids = scheduler.getSpanningCuboid(cuboid);
+            for (Long sc : spnanningCuboids) {
+                boolean notfound = cuboidSet.add(sc);
+                if (!notfound) {
+                    throw new IllegalStateException("Find duplicate spanning cuboid " + sc + " from cuboid " + cuboid);
+                }
+                cuboidQueue.push(sc);
+            }
+        }
+
+        TreeSet<Long> enumCuboids = enumCalcCuboidCount(cube);
+        if (enumCuboids.equals(cuboidSet) == false) {
+            throw new IllegalStateException("Expected cuboid set " + enumCuboids + "; but actual cuboid set " + cuboidSet);
+        }
+
+        int mathCount = mathCalcCuboidCount(cube);
+        if (mathCount != enumCuboids.size()) {
+            throw new IllegalStateException("Math cuboid count " + mathCount + ", but actual cuboid count " + enumCuboids.size());
+        }
+
+        return mathCount;
+
+    }
+
+    public static TreeSet<Long> enumCalcCuboidCount(CubeDesc cube) {
+        long baseCuboid = Cuboid.getBaseCuboidId(cube);
+        TreeSet<Long> expectedCuboids = new TreeSet<Long>();
+        for (long cuboid = 0; cuboid <= baseCuboid; cuboid++) {
+            if (Cuboid.isValid(cube, cuboid)) {
+                expectedCuboids.add(cuboid);
+            }
+        }
+        return expectedCuboids;
+    }
+
+    public static int[] calculateAllLevelCount(CubeDesc cube) {
+        int levels = cube.getRowkey().getNCuboidBuildLevels();
+        int[] allLevelCounts = new int[levels + 1];
+
+        CuboidScheduler scheduler = new CuboidScheduler(cube);
+        LinkedList<Long> nextQueue = new LinkedList<Long>();
+        LinkedList<Long> currentQueue = new LinkedList<Long>();
+        long baseCuboid = Cuboid.getBaseCuboidId(cube);
+        currentQueue.push(baseCuboid);
+
+        for (int i = 0; i <= levels; i++) {
+            allLevelCounts[i] = currentQueue.size();
+            while (!currentQueue.isEmpty()) {
+                long cuboid = currentQueue.pop();
+                Collection<Long> spnanningCuboids = scheduler.getSpanningCuboid(cuboid);
+                nextQueue.addAll(spnanningCuboids);
+            }
+            currentQueue = nextQueue;
+            nextQueue = new LinkedList<Long>();
+        }
+
+        return allLevelCounts;
+    }
+
+    public static int mathCalcCuboidCount(CubeDesc cube) {
+        int result = 1; // 1 for base cuboid
+
+        RowKeyDesc rowkey = cube.getRowkey();
+        AggrGroupMask[] aggrGroupMasks = rowkey.getAggrGroupMasks();
+        for (int i = 0; i < aggrGroupMasks.length; i++) {
+            boolean hasTail = i < aggrGroupMasks.length - 1 || rowkey.getTailMask() > 0;
+            result += mathCalcCuboidCount_aggrGroup(rowkey, aggrGroupMasks[i], hasTail);
+        }
+
+        return result;
+    }
+
+    private static int mathCalcCuboidCount_aggrGroup(RowKeyDesc rowkey, AggrGroupMask aggrGroupMask, boolean hasTail) {
+        long groupMask = aggrGroupMask.groupMask;
+        int n = mathCalcCuboidCount_combination(rowkey, groupMask);
+        n -= 2; // exclude group all 1 and all 0
+
+        long nonUniqueMask = groupMask & (~aggrGroupMask.uniqueMask);
+        if (nonUniqueMask > 0) {
+            // exclude duplicates caused by non-unique columns
+            // FIXME this assumes non-unique masks consolidates in ONE following group which maybe not be true
+            n -= mathCalcCuboidCount_combination(rowkey, nonUniqueMask) - 1; // exclude all 0
+        }
+
+        if (hasTail) {
+            n *= 2; // tail being 1 and 0
+            n += 2; // +1 for group all 1 and tail 0; +1 for group all 0 and tail 1
+        }
+
+        return n;
+    }
+
+    private static int mathCalcCuboidCount_combination(RowKeyDesc rowkey, long colMask) {
+        if (colMask == 0) // no column selected
+            return 0;
+
+        int count = 1;
+
+        for (HierarchyMask hierMask : rowkey.getHierarchyMasks()) {
+            long hierBits = colMask & hierMask.fullMask;
+            if (hierBits != 0) {
+                count *= Long.bitCount(hierBits) + 1; // +1 is for all-zero case
+                colMask &= ~hierBits;
+            }
+        }
+
+        count *= Math.pow(2, Long.bitCount(colMask));
+
+        return count;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
new file mode 100644
index 0000000..bebfd08
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cuboid;
+
+/** 
+ */
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask;
+
+public class CuboidScheduler {
+
+    private final CubeDesc cubeDef;
+    private final int size;
+    private final long max;
+    private final Map<Long, List<Long>> cache;
+
+    public CuboidScheduler(CubeDesc cube) {
+        this.cubeDef = cube;
+        this.size = cube.getRowkey().getRowKeyColumns().length;
+        this.max = (long) Math.pow(2, size) - 1;
+        this.cache = new ConcurrentHashMap<Long, List<Long>>();
+    }
+    
+    public int getCuboidCount() {
+        return getCuboidCount(Cuboid.getBaseCuboidId(cubeDef));
+    }
+
+    private int getCuboidCount(long cuboidId) {
+        int r = 1;
+        for (Long child : getSpanningCuboid(cuboidId)) {
+            r += getCuboidCount(child);
+        }
+        return r;
+    }
+
+    public List<Long> getSpanningCuboid(long cuboid) {
+        if (cuboid > max || cuboid < 0) {
+            throw new IllegalArgumentException("Cuboid " + cuboid + " is out of scope 0-" + max);
+        }
+
+        List<Long> result = cache.get(cuboid);
+        if (result != null) {
+            return result;
+        }
+
+        // smaller sibling's children
+        Collection<Long> allPrevOffspring = new HashSet<Long>();
+        for (Long sibling : findSmallerSibling(cuboid)) {
+            Collection<Long> prevOffsprings = generateChildren(sibling);
+            allPrevOffspring.addAll(prevOffsprings);
+        }
+
+        // my children is my generation excluding smaller sibling's generation
+        result = new ArrayList<Long>();
+        for (Long offspring : generateChildren(cuboid)) {
+            if (!allPrevOffspring.contains(offspring)) {
+                result.add(offspring);
+            }
+        }
+
+        cache.put(cuboid, result);
+        return result;
+    }
+
+    private Collection<Long> generateChildren(long cuboid) {
+        Collection<Long> result = new HashSet<Long>();
+
+        // generate zero tail cuboid -- the one with all 1 in the first
+        // aggregation group and all 0 for the rest bits
+        generateZeroTailBase(cuboid, result);
+
+        RowKeyDesc rowkey = cubeDef.getRowkey();
+        long cuboidWithoutMandatory = cuboid & ~rowkey.getMandatoryColumnMask();
+        for (AggrGroupMask mask : rowkey.getAggrGroupMasks()) {
+            if (belongTo(cuboidWithoutMandatory, mask) == false)
+                continue;
+
+            long[] groupOneBitMasks = mask.groupOneBitMasks;
+            for (int i = 0; i < groupOneBitMasks.length; i++) {
+                long oneBit = groupOneBitMasks[i];
+                if ((cuboid & oneBit) == 0)
+                    continue;
+
+                long child = cuboid ^ oneBit;
+                if (Cuboid.isValid(cubeDef, child)) {
+                    result.add(child);
+                }
+            }
+
+            if ((cuboidWithoutMandatory & mask.uniqueMask) > 0)
+                break;
+        }
+
+        return result;
+    }
+
+    private void generateZeroTailBase(long cuboid, Collection<Long> result) {
+        RowKeyDesc rowkey = cubeDef.getRowkey();
+
+        long cuboidWithoutMandatory = cuboid & ~rowkey.getMandatoryColumnMask();
+
+        for (AggrGroupMask mask : rowkey.getAggrGroupMasks()) {
+            if ((cuboidWithoutMandatory & mask.groupMask) == mask.groupMask && (cuboidWithoutMandatory & mask.leftoverMask) == mask.leftoverMask) {
+                long zeroTail = rowkey.getMandatoryColumnMask() | mask.groupMask;
+                if (zeroTail > 0 && zeroTail != cuboid) {
+                    result.add(zeroTail);
+                }
+            }
+            if ((cuboidWithoutMandatory & mask.uniqueMask) > 0)
+                break;
+        }
+    }
+
+    public Collection<Long> findSmallerSibling(long cuboid) {
+        if (!Cuboid.isValid(cubeDef, cuboid)) {
+            return Collections.emptyList();
+        }
+
+        RowKeyDesc rowkey = cubeDef.getRowkey();
+
+        // do combination in all related groups
+        long groupAllBitMask = 0;
+        for (AggrGroupMask mask : rowkey.getAggrGroupMasks()) {
+            if ((mask.groupMask & cuboid) > 0) {
+                groupAllBitMask |= mask.groupMask;
+            }
+        }
+
+        long groupBitValue = cuboid & groupAllBitMask;
+        long leftBitValue = cuboid & ~groupAllBitMask;
+        long[] groupOneBits = bits(groupAllBitMask);
+
+        Collection<Long> siblings = new HashSet<Long>();
+        combination(cuboid, siblings, groupOneBits, 0, leftBitValue, Long.bitCount(groupBitValue));
+        return siblings;
+    }
+
+    private long[] bits(long groupAllBitMask) {
+        int size = Long.bitCount(groupAllBitMask);
+        long[] r = new long[size];
+        long l = groupAllBitMask;
+        int i = 0;
+        while (l != 0) {
+            long bit = Long.highestOneBit(l);
+            r[i++] = bit;
+            l ^= bit;
+        }
+        return r;
+    }
+
+    private void combination(long cuboid, Collection<Long> siblings, long[] bitMasks, int offset, long bitValue, int k) {
+        if (k == 0) {
+            if (Cuboid.isValid(cubeDef, bitValue)) {
+                siblings.add(bitValue);
+            }
+        } else {
+            for (int i = offset; i < bitMasks.length; i++) {
+                long newBitValue = bitValue | bitMasks[i];
+                if (newBitValue < cuboid) {
+                    combination(cuboid, siblings, bitMasks, i + 1, newBitValue, k - 1);
+                }
+            }
+        }
+    }
+
+    private boolean belongTo(long cuboidWithoutMandatory, AggrGroupMask mask) {
+        long groupBits = cuboidWithoutMandatory & mask.groupMask;
+        long leftoverBits = cuboidWithoutMandatory & mask.leftoverMask;
+        return groupBits > 0 && (leftoverBits == 0 || leftoverBits == mask.leftoverMask);
+    }
+
+    public int getCardinality(long cuboid) {
+        if (cuboid > max || cuboid < 0) {
+            throw new IllegalArgumentException("Cubiod " + cuboid + " is out of scope 0-" + max);
+        }
+
+        return Long.bitCount(cuboid);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
new file mode 100644
index 0000000..ddfbab3
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * 
+ * @author xjiang
+ * 
+ */
+public abstract class AbstractRowKeyEncoder {
+
+    public static final byte DEFAULT_BLANK_BYTE = Dictionary.NULL;
+
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractRowKeyEncoder.class);
+
+    private static final Map<String, Map<Long, AbstractRowKeyEncoder>> ENCODER_CACHE = new ConcurrentHashMap<String, Map<Long, AbstractRowKeyEncoder>>();
+
+    public static AbstractRowKeyEncoder createInstance(CubeSegment cubeSeg, Cuboid cuboid) {
+
+        // The storage location identifier is unique for every segment
+        Map<Long, AbstractRowKeyEncoder> cubeCache = ENCODER_CACHE.get(cubeSeg.getStorageLocationIdentifier());
+
+        if (cubeCache == null) {
+            cubeCache = new HashMap<Long, AbstractRowKeyEncoder>();
+            ENCODER_CACHE.put(cuboid.getCube().getName(), cubeCache);
+        }
+
+        AbstractRowKeyEncoder encoder = cubeCache.get(cuboid.getId());
+        if (encoder == null) {
+            encoder = new RowKeyEncoder(cubeSeg, cuboid);
+            cubeCache.put(cuboid.getId(), encoder);
+        }
+        return encoder;
+    }
+
+    protected final Cuboid cuboid;
+    protected byte blankByte = DEFAULT_BLANK_BYTE;
+
+    protected AbstractRowKeyEncoder(Cuboid cuboid) {
+        this.cuboid = cuboid;
+    }
+
+    public void setBlankByte(byte blankByte) {
+        this.blankByte = blankByte;
+    }
+
+    abstract public byte[] encode(Map<TblColRef, String> valueMap);
+
+    abstract public byte[] encode(byte[][] values);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
new file mode 100644
index 0000000..a17bb28
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.Arrays;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+
+/**
+ * 
+ * @author xjiang
+ * 
+ */
+public class FuzzyKeyEncoder extends RowKeyEncoder {
+
+    public FuzzyKeyEncoder(CubeSegment seg, Cuboid cuboid) {
+        super(seg, cuboid);
+    }
+
+    @Override
+    protected byte[] defaultValue(int length) {
+        byte[] keyBytes = new byte[length];
+        Arrays.fill(keyBytes, RowConstants.FUZZY_MASK_ZERO);
+        return keyBytes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
new file mode 100644
index 0000000..5077287
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.Arrays;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * 
+ * @author xjiang
+ * 
+ */
+public class FuzzyMaskEncoder extends RowKeyEncoder {
+
+    public FuzzyMaskEncoder(CubeSegment seg, Cuboid cuboid) {
+        super(seg, cuboid);
+    }
+
+    @Override
+    protected int fillHeader(byte[] bytes, byte[][] values) {
+        // always fuzzy match cuboid ID to lock on the selected cuboid
+        int cuboidStart = this.headerLength - RowConstants.ROWKEY_CUBOIDID_LEN;
+        Arrays.fill(bytes, 0, cuboidStart, RowConstants.FUZZY_MASK_ONE);
+        Arrays.fill(bytes, cuboidStart, this.headerLength, RowConstants.FUZZY_MASK_ZERO);
+        return this.headerLength;
+    }
+
+    @Override
+    protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) {
+        if (value == null) {
+            Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ONE);
+        } else {
+            Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ZERO);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
new file mode 100644
index 0000000..95b4718
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+/**
+ * 
+ * @author xjiang
+ * 
+ */
+public class RowConstants {
+
+    // row key fixed length place holder
+    public static final byte ROWKEY_PLACE_HOLDER_BYTE = 9;
+    // row key lower bound
+    public static final byte ROWKEY_LOWER_BYTE = 0;
+    // row key upper bound
+    public static final byte ROWKEY_UPPER_BYTE = (byte) 0xff;
+    // row key cuboid id length
+    public static final int ROWKEY_CUBOIDID_LEN = 8;
+
+    // fuzzy mask
+    public static final byte FUZZY_MASK_ZERO = 0;
+    public static final byte FUZZY_MASK_ONE = 1;
+
+    // row value delimiter
+    public static final byte ROWVALUE_DELIMITER_BYTE = 7;
+    public static final String ROWVALUE_DELIMITER_STRING = String.valueOf((char) 7);
+    public static final byte[] ROWVALUE_DELIMITER_BYTES = { 7 };
+
+    public static final int ROWVALUE_BUFFER_SIZE = 1024 * 1024; // 1 MB
+
+    // marker class
+    public static final byte[][] BYTE_ARR_MARKER = new byte[0][];
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
new file mode 100644
index 0000000..a2add5c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.Arrays;
+
+import org.apache.kylin.dict.IDictionaryAware;
+import org.apache.kylin.common.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * Read/Write column values from/into bytes
+ *
+ * @author yangli9
+ */
+@SuppressWarnings("unchecked")
+public class RowKeyColumnIO {
+
+    private static final Logger logger = LoggerFactory.getLogger(RowKeyColumnIO.class);
+
+    private IDictionaryAware IDictionaryAwareness;
+
+    public RowKeyColumnIO(IDictionaryAware IDictionaryAwareness) {
+        this.IDictionaryAwareness = IDictionaryAwareness;
+    }
+
+    public int getColumnLength(TblColRef col) {
+        return IDictionaryAwareness.getColumnLength(col);
+    }
+
+    //TODO is type cast really necessary here?
+    public Dictionary<String> getDictionary(TblColRef col) {
+        return (Dictionary<String>) IDictionaryAwareness.getDictionary(col);
+    }
+
+    public void writeColumnWithoutDictionary(byte[] src, int srcOffset, int srcLength, byte[] dst, int dstOffset, int dstLength) {
+        if (srcLength >= dstLength) {
+            System.arraycopy(src, srcOffset, dst, dstOffset, dstLength);
+        } else {
+            System.arraycopy(src, srcOffset, dst, dstOffset, srcLength);
+            Arrays.fill(dst, dstOffset + srcLength, dstOffset + dstLength, RowConstants.ROWKEY_PLACE_HOLDER_BYTE);
+        }
+    }
+
+    public void writeColumnWithDictionary(Dictionary<String> dictionary, byte[] src, int srcOffset, int srcLength, byte[] dst, int dstOffset, int dstLength, int roundingFlag, int defaultValue) {
+        // dict value
+        try {
+            int id = dictionary.getIdFromValueBytes(src, srcOffset, srcLength, roundingFlag);
+            BytesUtil.writeUnsigned(id, dst, dstOffset, dictionary.getSizeOfId());
+        } catch (IllegalArgumentException ex) {
+            Arrays.fill(dst, dstOffset, dstOffset + dstLength, (byte) defaultValue);
+            logger.error("Can't translate value " + Bytes.toString(src, srcOffset, srcLength) + " to dictionary ID, roundingFlag " + roundingFlag + ". Using default value " + String.format("\\x%02X", defaultValue));
+        }
+    }
+
+
+
+    public void writeColumn(TblColRef column, byte[] value, int valueLen, byte defaultValue, byte[] output, int outputOffset) {
+        writeColumn(column, value, valueLen, 0, defaultValue, output, outputOffset);
+    }
+
+    public void writeColumn(TblColRef column, byte[] value, int valueLen, int roundingFlag, byte defaultValue, byte[] output, int outputOffset) {
+
+        final Dictionary<String> dict = getDictionary(column);
+        final int columnLen = getColumnLength(column);
+
+        // non-dict value
+        if (dict == null) {
+            byte[] valueBytes = padFixLen(columnLen, value, valueLen);
+            System.arraycopy(valueBytes, 0, output, outputOffset, columnLen);
+            return;
+        }
+
+        // dict value
+        try {
+            int id = dict.getIdFromValueBytes(value, 0, valueLen, roundingFlag);
+            BytesUtil.writeUnsigned(id, output, outputOffset, dict.getSizeOfId());
+        } catch (IllegalArgumentException ex) {
+            for (int i = outputOffset; i < outputOffset + columnLen; i++) {
+                output[i] = defaultValue;
+            }
+            logger.error("Can't translate value " + Bytes.toString(value, 0, valueLen) + " to dictionary ID, roundingFlag " + roundingFlag + ". Using default value " + String.format("\\x%02X", defaultValue));
+        }
+    }
+
+    private byte[] padFixLen(int length, byte[] valueBytes, int valLen) {
+        if (valLen == length) {
+            return valueBytes;
+        } else if (valLen < length) {
+            byte[] newValueBytes = new byte[length];
+            System.arraycopy(valueBytes, 0, newValueBytes, 0, valLen);
+            Arrays.fill(newValueBytes, valLen, length, RowConstants.ROWKEY_PLACE_HOLDER_BYTE);
+            return newValueBytes;
+        } else {
+            return Arrays.copyOf(valueBytes, length);
+        }
+    }
+
+    public String readColumnString(TblColRef col, byte[] bytes, int offset, int length) {
+        Dictionary<String> dict = getDictionary(col);
+        if (dict == null) {
+            if (isNull(bytes, offset, length)) {
+                return null;
+            }
+            bytes = removeFixLenPad(bytes, offset, length);
+            return Bytes.toString(bytes);
+        } else {
+            int id = BytesUtil.readUnsigned(bytes, offset, length);
+            try {
+                String value = dict.getValueFromId(id);
+                return value;
+            } catch (IllegalArgumentException e) {
+                logger.error("Can't get dictionary value for column " + col.getName() + " (id = " + id + ")");
+                return "";
+            }
+        }
+    }
+
+    public String readColumnString(TblColRef col, byte[] bytes, int bytesLen) {
+        return readColumnString(col, bytes, 0, bytesLen);
+    }
+
+    private boolean isNull(byte[] bytes, int offset, int length) {
+        // all 0xFF is NULL
+        if (length == 0) {
+            return false;
+        }
+        for (int i = 0; i < bytes.length; i++) {
+            if (bytes[i + offset] != AbstractRowKeyEncoder.DEFAULT_BLANK_BYTE) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private byte[] removeFixLenPad(byte[] bytes, int offset, int length) {
+        int padCount = 0;
+        for (int i = 0; i < length; i++) {
+            if (bytes[i + offset] == RowConstants.ROWKEY_PLACE_HOLDER_BYTE) {
+                padCount++;
+            }
+        }
+
+        int size = length - padCount;
+        byte[] stripBytes = new byte[size];
+        int index = 0;
+        for (int i = 0; i < length; i++) {
+            byte vb = bytes[i + offset];
+            if (vb != RowConstants.ROWKEY_PLACE_HOLDER_BYTE) {
+                stripBytes[index++] = vb;
+            }
+        }
+        return stripBytes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
new file mode 100644
index 0000000..d7a48d9
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ * @author yangli9
+ */
+abstract public class RowKeyColumnOrder implements Comparator<String> {
+
+    public static final NumberOrder NUMBER_ORDER = new NumberOrder();
+    public static final StringOrder STRING_ORDER = new StringOrder();
+
+    public static RowKeyColumnOrder getInstance(DataType type) {
+        if (type.isNumberFamily())
+            return NUMBER_ORDER;
+        else
+            return STRING_ORDER;
+    }
+
+    public String max(Collection<String> values) {
+        String max = null;
+        for (String v : values) {
+            if (max == null || compare(max, v) < 0)
+                max = v;
+        }
+        return max;
+    }
+
+    public String min(Collection<String> values) {
+        String min = null;
+        for (String v : values) {
+            if (min == null || compare(min, v) > 0)
+                min = v;
+        }
+        return min;
+    }
+
+    public String min(String v1, String v2) {
+        if (v1 == null)
+            return v2;
+        else if (v2 == null)
+            return v1;
+        else
+            return compare(v1, v2) <= 0 ? v1 : v2;
+    }
+
+    public String max(String v1, String v2) {
+        if (v1 == null)
+            return v2;
+        else if (v2 == null)
+            return v1;
+        else
+            return compare(v1, v2) >= 0 ? v1 : v2;
+    }
+
+    @Override
+    public int compare(String o1, String o2) {
+        // consider null
+        if (o1 == o2)
+            return 0;
+        if (o1 == null)
+            return -1;
+        if (o2 == null)
+            return 1;
+
+        return compareNonNull(o1, o2);
+    }
+
+    abstract int compareNonNull(String o1, String o2);
+
+    private static class StringOrder extends RowKeyColumnOrder {
+        @Override
+        public int compareNonNull(String o1, String o2) {
+            return o1.compareTo(o2);
+        }
+    }
+
+    private static class NumberOrder extends RowKeyColumnOrder {
+        @Override
+        public int compareNonNull(String o1, String o2) {
+            double d1 = Double.parseDouble(o1);
+            double d2 = Double.parseDouble(o2);
+            return Double.compare(d1, d2);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
new file mode 100644
index 0000000..1b896a0
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * 
+ * @author xjiang
+ * 
+ */
+public class RowKeyDecoder {
+
+    private final CubeDesc cubeDesc;
+    private final RowKeyColumnIO colIO;
+    private final RowKeySplitter rowKeySplitter;
+
+    private Cuboid cuboid;
+    private List<String> values;
+
+    public RowKeyDecoder(CubeSegment cubeSegment) {
+        this.cubeDesc = cubeSegment.getCubeDesc();
+        this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 255);
+        this.colIO = new RowKeyColumnIO(cubeSegment);
+        this.values = new ArrayList<String>();
+    }
+
+    public long decode(byte[] bytes) throws IOException {
+        this.values.clear();
+
+        long cuboidId = rowKeySplitter.split(bytes, bytes.length);
+        initCuboid(cuboidId);
+
+        SplittedBytes[] splits = rowKeySplitter.getSplitBuffers();
+
+        int offset = 1; // skip cuboid id part
+
+        for (int i = 0; i < this.cuboid.getColumns().size(); i++) {
+            TblColRef col = this.cuboid.getColumns().get(i);
+            collectValue(col, splits[offset].value, splits[offset].length);
+            offset++;
+        }
+
+        return cuboidId;
+    }
+
+    private void initCuboid(long cuboidID) {
+        if (this.cuboid != null && this.cuboid.getId() == cuboidID) {
+            return;
+        }
+        this.cuboid = Cuboid.findById(cubeDesc, cuboidID);
+    }
+
+    private void collectValue(TblColRef col, byte[] valueBytes, int length) throws IOException {
+        String strValue = colIO.readColumnString(col, valueBytes, length);
+        values.add(strValue);
+    }
+
+    public RowKeySplitter getRowKeySplitter() {
+        return rowKeySplitter;
+    }
+
+    public void setCuboid(Cuboid cuboid) {
+        this.cuboid = cuboid;
+    }
+
+    public List<TblColRef> getColumns() {
+        return cuboid.getColumns();
+    }
+
+    public List<String> getValues() {
+        return values;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder();
+        buf.append(cuboid.getId());
+        for (Object value : values) {
+            buf.append(",");
+            buf.append(value);
+        }
+        return buf.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
new file mode 100644
index 0000000..5b6cd91
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Bytes;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class RowKeyEncoder extends AbstractRowKeyEncoder {
+
+    private int bytesLength;
+    protected int headerLength;
+    private RowKeyColumnIO colIO;
+
+    protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
+        super(cuboid);
+        colIO = new RowKeyColumnIO(cubeSeg);
+        bytesLength = headerLength = RowConstants.ROWKEY_CUBOIDID_LEN; // header
+        for (TblColRef column : cuboid.getColumns()) {
+            bytesLength += colIO.getColumnLength(column);
+        }
+    }
+
+    public RowKeyColumnIO getColumnIO() {
+        return colIO;
+    }
+
+    public int getColumnOffset(TblColRef col) {
+        int offset = RowConstants.ROWKEY_CUBOIDID_LEN;
+
+        for (TblColRef dimCol : cuboid.getColumns()) {
+            if (col.equals(dimCol))
+                return offset;
+            offset += colIO.getColumnLength(dimCol);
+        }
+
+        throw new IllegalArgumentException("Column " + col + " not found on cuboid " + cuboid);
+    }
+
+    public int getColumnLength(TblColRef col) {
+        return colIO.getColumnLength(col);
+    }
+
+    public int getRowKeyLength() {
+        return bytesLength;
+    }
+
+    public int getHeaderLength() {
+        return headerLength;
+    }
+
+    @Override
+    public byte[] encode(Map<TblColRef, String> valueMap) {
+        List<byte[]> valueList = new ArrayList<byte[]>();
+        for (TblColRef bdCol : cuboid.getColumns()) {
+            String value = valueMap.get(bdCol);
+            valueList.add(valueStringToBytes(value));
+        }
+        byte[][] values = valueList.toArray(RowConstants.BYTE_ARR_MARKER);
+        return encode(values);
+    }
+
+    public byte[] valueStringToBytes(String value) {
+        if (value == null)
+            return null;
+        else
+            return Bytes.toBytes(value);
+    }
+
+    @Override
+    public byte[] encode(byte[][] values) {
+        byte[] bytes = new byte[this.bytesLength];
+        int offset = fillHeader(bytes, values);
+
+        for (int i = 0; i < cuboid.getColumns().size(); i++) {
+            TblColRef column = cuboid.getColumns().get(i);
+            int colLength = colIO.getColumnLength(column);
+            byte[] value = values[i];
+            if (value == null) {
+                fillColumnValue(column, colLength, null, 0, bytes, offset);
+            } else {
+                fillColumnValue(column, colLength, value, value.length, bytes, offset);
+            }
+            offset += colLength;
+
+        }
+        return bytes;
+    }
+
+    protected int fillHeader(byte[] bytes, byte[][] values) {
+        int offset = 0;
+        System.arraycopy(cuboid.getBytes(), 0, bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
+        offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+        if (this.headerLength != offset) {
+            throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset);
+        }
+        return offset;
+    }
+
+    protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) {
+        // special null value case
+        if (value == null) {
+            byte[] valueBytes = defaultValue(columnLen);
+            System.arraycopy(valueBytes, 0, outputValue, outputValueOffset, columnLen);
+            return;
+        }
+
+        colIO.writeColumn(column, value, valueLen, this.blankByte, outputValue, outputValueOffset);
+    }
+
+    protected byte[] defaultValue(int length) {
+        byte[] values = new byte[length];
+        Arrays.fill(values, this.blankByte);
+        return values;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
new file mode 100644
index 0000000..2ab12b9
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+
+/**
+ * @author xjiang
+ */
+public class RowValueDecoder implements Cloneable {
+
+    private final HBaseColumnDesc hbaseColumn;
+    private final byte[] hbaseColumnFamily;
+    private final byte[] hbaseColumnQualifier;
+
+    private final MeasureCodec codec;
+    private final BitSet projectionIndex;
+    private final MeasureDesc[] measures;
+    private Object[] values;
+
+    public RowValueDecoder(HBaseColumnDesc hbaseColumn) {
+        this.hbaseColumn = hbaseColumn;
+        this.hbaseColumnFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName());
+        this.hbaseColumnQualifier = Bytes.toBytes(hbaseColumn.getQualifier());
+        this.projectionIndex = new BitSet();
+        this.measures = hbaseColumn.getMeasures();
+        this.codec = new MeasureCodec(measures);
+        this.values = new Object[measures.length];
+    }
+
+    public void decode(Result hbaseRow) {
+        decode(hbaseRow, true);
+    }
+
+    public void decode(Result hbaseRow, boolean convertToJavaObject) {
+        decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject);
+    }
+
+    public void decode(byte[] bytes) {
+        decode(bytes, true);
+    }
+
+    public void decode(byte[] bytes, boolean convertToJavaObject) {
+        decode(ByteBuffer.wrap(bytes), convertToJavaObject);
+    }
+
+    private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
+        codec.decode(buffer, values);
+        if (convertToJavaObject) {
+            convertToJavaObjects(values, values, convertToJavaObject);
+        }
+    }
+
+    private void convertToJavaObjects(Object[] mapredObjs, Object[] results, boolean convertToJavaObject) {
+        for (int i = 0; i < mapredObjs.length; i++) {
+            Object o = mapredObjs[i];
+
+            if (o instanceof LongWritable)
+                o = ((LongWritable) o).get();
+            else if (o instanceof IntWritable)
+                o = ((IntWritable) o).get();
+            else if (o instanceof DoubleWritable)
+                o = ((DoubleWritable) o).get();
+            else if (o instanceof FloatWritable)
+                o = ((FloatWritable) o).get();
+
+            results[i] = o;
+        }
+    }
+
+    public void setIndex(int bitIndex) {
+        projectionIndex.set(bitIndex);
+    }
+
+    public HBaseColumnDesc getHBaseColumn() {
+        return hbaseColumn;
+    }
+
+    public BitSet getProjectionIndex() {
+        return projectionIndex;
+    }
+
+    public Object[] getValues() {
+        return values;
+    }
+
+    public MeasureDesc[] getMeasures() {
+        return measures;
+    }
+
+    public boolean hasMemHungryCountDistinct() {
+        for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
+            FunctionDesc func = measures[i].getFunction();
+            if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static boolean hasMemHungryCountDistinct(Collection<RowValueDecoder> rowValueDecoders) {
+        for (RowValueDecoder decoder : rowValueDecoders) {
+            if (decoder.hasMemHungryCountDistinct())
+                return true;
+        }
+        return false;
+    }
+
+}