You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by lu...@apache.org on 2015/01/07 15:47:02 UTC

[41/51] [partial] incubator-kylin git commit: migrate repo from github.com to apache git

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/dataGen/FactTableGenerator.java b/cube/src/main/java/com/kylinolap/cube/dataGen/FactTableGenerator.java
new file mode 100644
index 0000000..de63c27
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/dataGen/FactTableGenerator.java
@@ -0,0 +1,645 @@
+package com.kylinolap.cube.dataGen;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.persistence.ResourceStore;
+import com.kylinolap.common.util.Array;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.metadata.MetadataManager;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+import com.kylinolap.metadata.model.cube.DimensionDesc;
+import com.kylinolap.metadata.model.cube.JoinDesc;
+import com.kylinolap.metadata.model.cube.MeasureDesc;
+import com.kylinolap.metadata.model.cube.TblColRef;
+import com.kylinolap.metadata.model.schema.ColumnDesc;
+import com.kylinolap.metadata.model.schema.DataType;
+
+/**
+ * Created by hongbin on 5/20/14.
+ */
+public class FactTableGenerator {
+    CubeInstance cube = null;
+    CubeDesc desc = null;
+    ResourceStore store = null;
+    String factTableName = null;
+
+    GenConfig genConf = null;
+
+    Random r = null;
+
+    String cubeName;
+    long randomSeed;
+    int rowCount;
+    int unlinkableRowCount;
+    int unlinkableRowCountMax;
+    double conflictRatio;
+    double linkableRatio;
+
+    // the names of lookup table columns which is in relation with fact
+    // table(appear as fk in fact table)
+    Hashtable<String, LinkedList<String>> lookupTableKeys = new Hashtable<String, LinkedList<String>>();
+
+    // possible values of lookupTableKeys, extracted from existing lookup
+    // tables.
+    // The key is in the format of tablename/columnname
+    HashMap<String, ArrayList<String>> feasibleValues = new HashMap<String, ArrayList<String>>();
+
+    // lookup table name -> sets of all composite keys
+    HashMap<String, HashSet<Array<String>>> lookupTableCompositeKeyValues = new HashMap<String, HashSet<Array<String>>>();
+
+    private void init(String cubeName, int rowCount, double conflictRaio, double linkableRatio, long randomSeed) {
+        this.rowCount = rowCount;
+        this.conflictRatio = conflictRaio;
+        this.cubeName = cubeName;
+        this.randomSeed = randomSeed;
+        this.linkableRatio = linkableRatio;
+
+        this.unlinkableRowCountMax = (int) (this.rowCount * (1 - linkableRatio));
+        this.unlinkableRowCount = 0;
+
+        r = new Random(randomSeed);
+
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        desc = cube.getDescriptor();
+        factTableName = desc.getFactTable();
+        store = ResourceStore.getStore(config);
+    }
+
+    /*
+     * users can specify the value preference for each column
+     */
+    private void loadConfig() {
+        try {
+            InputStream configStream = null;
+            configStream = store.getResource("/data/data_gen_config.json");
+            this.genConf = GenConfig.loadConfig(configStream);
+
+            if (configStream != null)
+                configStream.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void loadLookupTableValues(String lookupTableName, LinkedList<String> columnNames, int distinctRowCount) throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+        // only deal with composite keys
+        if (columnNames.size() > 1 && !lookupTableCompositeKeyValues.containsKey(lookupTableName)) {
+            lookupTableCompositeKeyValues.put(lookupTableName, new HashSet<Array<String>>());
+        }
+
+        InputStream tableStream = null;
+        BufferedReader tableReader = null;
+        try {
+            Hashtable<String, Integer> zeroBasedInice = new Hashtable<String, Integer>();
+            for (String columnName : columnNames) {
+                ColumnDesc cDesc = MetadataManager.getInstance(config).getTableDesc(lookupTableName).findColumnByName(columnName);
+                zeroBasedInice.put(columnName, cDesc.getZeroBasedIndex());
+            }
+
+            String path = "/data/" + lookupTableName + ".csv";
+            tableStream = store.getResource(path);
+            tableReader = new BufferedReader(new InputStreamReader(tableStream));
+            tableReader.mark(0);
+            int rowCount = 0;
+            int curRowNum = 0;
+            String curRow;
+
+            while (tableReader.readLine() != null)
+                rowCount++;
+
+            HashSet<Integer> rows = new HashSet<Integer>();
+            distinctRowCount = (distinctRowCount < rowCount) ? distinctRowCount : rowCount;
+            while (rows.size() < distinctRowCount) {
+                rows.add(r.nextInt(rowCount));
+            }
+
+            // reopen the stream
+            tableStream.close();
+            tableReader.close();
+            tableStream = null;
+            tableReader = null;
+
+            tableStream = store.getResource(path);
+            tableReader = new BufferedReader(new InputStreamReader(tableStream));
+
+            while ((curRow = tableReader.readLine()) != null) {
+                if (rows.contains(curRowNum)) {
+                    String[] tokens = curRow.split(",");
+
+                    String[] comboKeys = null;
+                    int index = 0;
+                    if (columnNames.size() > 1)
+                        comboKeys = new String[columnNames.size()];
+
+                    for (String columnName : columnNames) {
+                        int zeroBasedIndex = zeroBasedInice.get(columnName);
+                        if (!feasibleValues.containsKey(lookupTableName + "/" + columnName))
+                            feasibleValues.put(lookupTableName + "/" + columnName, new ArrayList<String>());
+                        feasibleValues.get(lookupTableName + "/" + columnName).add(tokens[zeroBasedIndex]);
+
+                        if (columnNames.size() > 1) {
+                            comboKeys[index] = tokens[zeroBasedIndex];
+                            index++;
+                        }
+                    }
+
+                    if (columnNames.size() > 1) {
+                        Array<String> wrap = new Array<String>(comboKeys);
+                        if (lookupTableCompositeKeyValues.get(lookupTableName).contains(wrap)) {
+                            throw new Exception("The composite key already exist in the lookup table");
+                        }
+                        lookupTableCompositeKeyValues.get(lookupTableName).add(wrap);
+                    }
+                }
+                curRowNum++;
+            }
+
+            if (tableStream != null)
+                tableStream.close();
+            if (tableReader != null)
+                tableReader.close();
+
+        } catch (IOException e) {
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+
+    // prepare the candidate values for each joined column
+    private void prepare() throws Exception {
+        // load config
+        loadConfig();
+
+        HashSet<String> factTableColumns = new HashSet<String>();
+
+        for (DimensionDesc dim : desc.getDimensions()) {
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (col.getTable().equals(factTableName))
+                    factTableColumns.add(col.getName());
+            }
+
+            JoinDesc join = dim.getJoin();
+            if (join != null) {
+                String lookupTable = dim.getTable();
+                for (String column : join.getPrimaryKey()) {
+                    if (!lookupTableKeys.containsKey(lookupTable)) {
+                        lookupTableKeys.put(lookupTable, new LinkedList<String>());
+                    }
+
+                    if (!lookupTableKeys.get(lookupTable).contains(column))
+                        lookupTableKeys.get(lookupTable).add(column);
+                }
+            }
+        }
+
+        int distinctRowCount = (int) (this.rowCount / this.conflictRatio);
+        distinctRowCount = (distinctRowCount == 0) ? 1 : distinctRowCount;
+        // lookup tables
+        for (String lookupTable : lookupTableKeys.keySet()) {
+            this.loadLookupTableValues(lookupTable, lookupTableKeys.get(lookupTable), distinctRowCount);
+        }
+    }
+
+    private List<DimensionDesc> getSortedDimentsionDescs() {
+        List<DimensionDesc> dimensions = desc.getDimensions();
+        Collections.sort(dimensions, new Comparator<DimensionDesc>() {
+            @Override
+            public int compare(DimensionDesc o1, DimensionDesc o2) {
+                JoinDesc j1 = o2.getJoin();
+                JoinDesc j2 = o1.getJoin();
+                return Integer.valueOf(j1 != null ? j1.getPrimaryKey().length : 0).compareTo(j2 != null ? j2.getPrimaryKey().length : 0);
+            }
+        });
+        return dimensions;
+    }
+
+    private void execute(String joinType) throws Exception {
+
+        // main logic here , generate the data to a temp file
+        String tempFilePath = generate();
+
+        // Write to hbase
+        File tempFile = new File(tempFilePath);
+
+        InputStream in = new FileInputStream(tempFile);
+        String factTablePath = "/data/" + factTableName + ".csv";
+        store.deleteResource(factTablePath);
+        store.putResource(factTablePath, in, System.currentTimeMillis());
+        in.close();
+
+        // duplicate a copy of this fact table, with a naming convention with
+        // jointype added
+        // so that later test cases can select different data files
+        in = new FileInputStream(tempFile);
+        String factTablePathWithJoinType = "/data/" + factTableName + ".csv." + joinType.toLowerCase();
+        store.deleteResource(factTablePathWithJoinType);
+        store.putResource(factTablePathWithJoinType, in, System.currentTimeMillis());
+        in.close();
+
+        tempFile.delete();
+
+        System.out.println();
+        System.out.println("The new fact table has been written to $KYLIN_METADATA_URL" + factTablePath);
+        System.out.println();
+    }
+
+    /**
+     * Generate the fact table and put it into a temp file
+     *
+     * @return
+     * @throws Exception
+     */
+    private String generate() throws Exception {
+        // the columns on the fact table can be classified into three groups:
+        // 1. foreign keys
+        HashMap<String, String> factTableCol2LookupCol = new HashMap<String, String>();
+        // 2. metrics or directly used dimensions
+        HashSet<String> usedCols = new HashSet<String>();
+        // 3. others, not referenced anywhere
+
+        HashMap<String, String> lookupCol2factTableCol = new HashMap<String, String>();
+
+        // find fact table columns in fks
+        List<DimensionDesc> dimensions = getSortedDimentsionDescs();
+        for (DimensionDesc dim : dimensions) {
+            JoinDesc jDesc = dim.getJoin();
+            if (jDesc != null) {
+                String[] fks = jDesc.getForeignKey();
+                String[] pks = jDesc.getPrimaryKey();
+                int num = fks.length;
+                for (int i = 0; i < num; ++i) {
+                    String value = dim.getTable() + "/" + pks[i];
+
+                    lookupCol2factTableCol.put(value, fks[i]);
+
+                    if (factTableCol2LookupCol.containsKey(fks[i])) {
+                        if (!factTableCol2LookupCol.get(fks[i]).equals(value)) {
+                            System.out.println("Warning: Disambiguation on the mapping of column " + fks[i] + ", " + factTableCol2LookupCol.get(fks[i]) + "(chosen) or " + value);
+                            continue;
+                        }
+                    }
+                    factTableCol2LookupCol.put(fks[i], value);
+                }
+            }
+            //else, deal with it in next roung
+        }
+
+        // find fact table columns in direct dimension
+        // DO NOT merge this with the previous loop
+        for (DimensionDesc dim : dimensions) {
+            JoinDesc jDesc = dim.getJoin();
+            if (jDesc == null) {
+                // column on fact table used directly as a dimension
+                if (!factTableCol2LookupCol.containsKey(dim.getColumn()))
+                    usedCols.add(dim.getColumn());
+            }
+        }
+
+        // find fact table columns in measures
+        for (MeasureDesc mDesc : desc.getMeasures()) {
+            List<TblColRef> pcols = mDesc.getFunction().getParameter().getColRefs();
+            if (pcols != null) {
+                for (TblColRef col : pcols) {
+                    if (!factTableCol2LookupCol.containsKey(col.getName()))
+                        usedCols.add(col.getName());
+                }
+            }
+        }
+
+        return createTable(this.rowCount, factTableCol2LookupCol, lookupCol2factTableCol, usedCols);
+    }
+
+    private String normToTwoDigits(int v) {
+        if (v < 10)
+            return "0" + v;
+        else
+            return Integer.toString(v);
+    }
+
+    private String randomPick(ArrayList<String> candidates) {
+        int index = r.nextInt(candidates.size());
+        return candidates.get(index);
+    }
+
+    private String createRandomCell(ColumnDesc cDesc, ArrayList<String> range) throws Exception {
+        DataType type = cDesc.getType();
+        if (type.isStringFamily()) {
+            throw new Exception("Can't handle range values for string");
+
+        } else if (type.isIntegerFamily()) {
+            int low = Integer.parseInt(range.get(0));
+            int high = Integer.parseInt(range.get(1));
+            return Integer.toString(r.nextInt(high - low) + low);
+
+        } else if (type.isDouble()) {
+            double low = Double.parseDouble(range.get(0));
+            double high = Double.parseDouble(range.get(1));
+            return String.format("%.4f", r.nextDouble() * (high - low) + low);
+
+        } else if (type.isFloat()) {
+            float low = Float.parseFloat(range.get(0));
+            float high = Float.parseFloat(range.get(1));
+            return String.format("%.4f", r.nextFloat() * (high - low) + low);
+
+        } else if (type.isDecimal()) {
+            double low = Double.parseDouble(range.get(0));
+            double high = Double.parseDouble(range.get(1));
+            return String.format("%.4f", r.nextDouble() * (high - low) + low);
+
+        } else if (type.isDateTimeFamily()) {
+
+            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+            Date start = format.parse(range.get(0));
+            Date end = format.parse(range.get(1));
+            long diff = end.getTime() - start.getTime();
+            Date temp = new Date(start.getTime() + (long) (diff * r.nextDouble()));
+            Calendar cal = Calendar.getInstance();
+            cal.setTime(temp);
+            // first day
+            cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
+
+            return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
+        } else {
+            System.out.println("The data type " + type + "is not recognized");
+            System.exit(1);
+        }
+        return null;
+    }
+
+    private String createRandomCell(ColumnDesc cDesc) {
+        String type = cDesc.getTypeName();
+        String s = type.toLowerCase();
+        if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < 2; i++) {
+                sb.append((char) ('a' + r.nextInt(10)));// there are 10*10
+                // possible strings
+            }
+            return sb.toString();
+        } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
+            return Integer.toString(r.nextInt(128));
+        } else if (s.equals("double")) {
+            return String.format("%.4f", r.nextDouble() * 100);
+        } else if (s.equals("float")) {
+            return String.format("%.4f", r.nextFloat() * 100);
+        } else if (s.equals("decimal")) {
+            return String.format("%.4f", r.nextDouble() * 100);
+        } else if (s.equals("date")) {
+            long date20131231 = 61349312153265L;
+            long date20010101 = 60939158400000L;
+            long diff = date20131231 - date20010101;
+            Date temp = new Date(date20010101 + (long) (diff * r.nextDouble()));
+            Calendar cal = Calendar.getInstance();
+            cal.setTime(temp);
+            // first day
+            cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
+
+            return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
+        } else {
+            System.out.println("The data type " + type + "is not recognized");
+            System.exit(1);
+        }
+        return null;
+    }
+
+    private String createDefaultsCell(String type) {
+        String s = type.toLowerCase();
+        if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
+            return "abcde";
+        } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
+            return "0";
+        } else if (s.equals("double")) {
+            return "0";
+        } else if (s.equals("float")) {
+            return "0";
+        } else if (s.equals("decimal")) {
+            return "0";
+        } else if (s.equals("date")) {
+            return "1970-01-01";
+        } else {
+            System.out.println("The data type " + type + "is not recognized");
+            System.exit(1);
+        }
+        return null;
+    }
+
+    private void printColumnMappings(HashMap<String, String> factTableCol2LookupCol, HashSet<String> usedCols, HashSet<String> defaultColumns) {
+
+        System.out.println("=======================================================================");
+        System.out.format("%-30s %s", "FACT_TABLE_COLUMN", "MAPPING");
+        System.out.println();
+        System.out.println();
+        for (Map.Entry<String, String> entry : factTableCol2LookupCol.entrySet()) {
+            System.out.format("%-30s %s", entry.getKey(), entry.getValue());
+            System.out.println();
+        }
+        for (String key : usedCols) {
+            System.out.format("%-30s %s", key, "Random Values");
+            System.out.println();
+        }
+        for (String key : defaultColumns) {
+            System.out.format("%-30s %s", key, "Default Values");
+            System.out.println();
+        }
+        System.out.println("=======================================================================");
+
+        System.out.println("Parameters:");
+        System.out.println();
+        System.out.println("CubeName:        " + cubeName);
+        System.out.println("RowCount:        " + rowCount);
+        System.out.println("ConflictRatio:   " + conflictRatio);
+        System.out.println("LinkableRatio:   " + linkableRatio);
+        System.out.println("Seed:            " + randomSeed);
+        System.out.println();
+        System.out.println("The number of actual unlinkable fact rows is: " + this.unlinkableRowCount);
+        System.out.println("You can vary the above parameters to generate different datasets.");
+        System.out.println();
+    }
+
+    // Any row in the column must finally appear in the flatten big table.
+    // for single-column joins the generated row is guaranteed to have a match
+    // in lookup table
+    // for composite keys we'll need an extra check
+    private boolean matchAllCompositeKeys(HashMap<String, String> lookupCol2FactTableCol, LinkedList<String> columnValues) {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+        for (String lookupTable : lookupTableKeys.keySet()) {
+            if (lookupTableKeys.get(lookupTable).size() == 1)
+                continue;
+
+            String[] comboKey = new String[lookupTableKeys.get(lookupTable).size()];
+            int index = 0;
+            for (String column : lookupTableKeys.get(lookupTable)) {
+                String key = lookupTable + "/" + column;
+                String factTableCol = lookupCol2FactTableCol.get(key);
+                int cardinal = MetadataManager.getInstance(config).getTableDesc(factTableName).findColumnByName(factTableCol).getZeroBasedIndex();
+                comboKey[index] = columnValues.get(cardinal);
+
+                index++;
+            }
+            Array<String> wrap = new Array<String>(comboKey);
+            if (!lookupTableCompositeKeyValues.get(lookupTable).contains(wrap)) {
+                // System.out.println("Try " + wrap + " Failed, continue...");
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private String createCell(ColumnDesc cDesc) throws Exception {
+        ColumnConfig cConfig = null;
+
+        if ((cConfig = genConf.getColumnConfigByName(cDesc.getName())) == null) {
+            // if the column is not configured, use random values
+            return (createRandomCell(cDesc));
+
+        } else {
+            // the column has a configuration
+            if (!cConfig.isAsRange() && !cConfig.isExclusive() && r.nextBoolean()) {
+                // if the column still allows random values
+                return (createRandomCell(cDesc));
+
+            } else {
+                // use specified values
+                ArrayList<String> valueSet = cConfig.getValueSet();
+                if (valueSet == null || valueSet.size() == 0)
+                    throw new Exception("Did you forget to specify value set for " + cDesc.getName());
+
+                if (!cConfig.isAsRange()) {
+                    return (randomPick(valueSet));
+                } else {
+                    if (valueSet.size() != 2)
+                        throw new Exception("Only two values can be set for range values, the column: " + cDesc.getName());
+
+                    return (createRandomCell(cDesc, valueSet));
+                }
+            }
+
+        }
+    }
+
+    private LinkedList<String> createRow(HashMap<String, String> factTableCol2LookupCol, HashSet<String> usedCols, HashSet<String> defaultColumns) throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        LinkedList<String> columnValues = new LinkedList<String>();
+
+        for (ColumnDesc cDesc : MetadataManager.getInstance(config).getTableDesc(factTableName).getColumns()) {
+
+
+            String colName = cDesc.getName();
+
+            if (factTableCol2LookupCol.containsKey(colName)) {
+
+                // if the current column is a fk column in fact table
+                ArrayList<String> candidates = this.feasibleValues.get(factTableCol2LookupCol.get(colName));
+
+                columnValues.add(candidates.get(r.nextInt(candidates.size())));
+            } else if (usedCols.contains(colName)) {
+
+                // if the current column is a metric column in fact table
+                columnValues.add(createCell(cDesc));
+            } else {
+
+                // otherwise this column is not useful in OLAP
+                columnValues.add(createDefaultsCell(cDesc.getTypeName()));
+                defaultColumns.add(colName);
+            }
+        }
+
+        return columnValues;
+    }
+
+    private String createTable(int rowCount, HashMap<String, String> factTableCol2LookupCol, HashMap<String, String> lookupCol2FactTableCol, HashSet<String> usedCols) throws Exception {
+        try {
+            File tempFile = File.createTempFile("ftg", ".tmp");
+            BufferedWriter writer = new BufferedWriter(new FileWriter(tempFile));
+            HashSet<String> defaultColumns = new HashSet<String>();
+
+            for (int i = 0; i < rowCount; ) {
+
+                LinkedList<String> columnValues = createRow(factTableCol2LookupCol, usedCols, defaultColumns);
+
+                if (!matchAllCompositeKeys(lookupCol2FactTableCol, columnValues)) {
+                    if (unlinkableRowCount < unlinkableRowCountMax) {
+                        unlinkableRowCount++;
+                    } else {
+                        continue;
+                    }
+                }
+
+                StringBuffer sb = new StringBuffer();
+                for (String c : columnValues)
+                    sb.append(c + ",");
+                sb.deleteCharAt(sb.length() - 1);
+                writer.write(sb.toString());
+                writer.newLine();
+                i++;
+
+                // System.out.println("Just generated the " + i + "th record");
+            }
+            writer.flush();
+            writer.close();
+
+            printColumnMappings(factTableCol2LookupCol, usedCols, defaultColumns);
+
+            return tempFile.getAbsolutePath();
+
+        } catch (IOException e) {
+            e.printStackTrace();
+            System.exit(1);
+        }
+
+        return null;
+    }
+
+    /**
+     * Randomly create a fact table and put it to test_kylin_data table in hbase
+     *
+     * @param cubeName      name of the cube
+     * @param rowCount      expected row count generated
+     * @param linkableRatio the percentage of fact table rows that can be linked with all
+     *                      lookup table by INNER join
+     * @param randomSeed    random seed
+     */
+    public static void generate(String cubeName, String rowCount, String linkableRatio, String randomSeed, String joinType) throws Exception {
+
+        if (cubeName == null)
+            cubeName = "test_kylin_cube_with_slr_ready";
+        if (rowCount == null)
+            rowCount = "10000";
+        if (linkableRatio == null)
+            linkableRatio = "0.6";
+
+        //if (randomSeed == null)
+        // don't give it value
+
+        // String conflictRatio = "5";//this parameter do not allow configuring
+        // any more
+
+        FactTableGenerator generator = new FactTableGenerator();
+        long seed;
+        if (randomSeed != null) {
+            seed = Long.parseLong(randomSeed);
+        } else {
+            Random r = new Random();
+            seed = r.nextLong();
+        }
+
+        generator.init(cubeName, Integer.parseInt(rowCount), 5, Double.parseDouble(linkableRatio), seed);
+        generator.prepare();
+        generator.execute(joinType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/dataGen/GenConfig.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/dataGen/GenConfig.java b/cube/src/main/java/com/kylinolap/cube/dataGen/GenConfig.java
new file mode 100644
index 0000000..a877c63
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/dataGen/GenConfig.java
@@ -0,0 +1,63 @@
+package com.kylinolap.cube.dataGen;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.kylinolap.common.util.JsonUtil;
+
+/**
+ * Created by honma on 5/29/14.
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class GenConfig {
+
+    @JsonProperty("columnConfigs")
+    private ArrayList<ColumnConfig> columnConfigs;
+
+    private HashMap<String, ColumnConfig> cache = new HashMap<String, ColumnConfig>();
+
+    public ArrayList<ColumnConfig> getColumnConfigs() {
+        return columnConfigs;
+    }
+
+    public void setColumnConfigs(ArrayList<ColumnConfig> columnConfigs) {
+        this.columnConfigs = columnConfigs;
+    }
+
+    public ColumnConfig getColumnConfigByName(String columnName) {
+        columnName = columnName.toLowerCase();
+
+        if (cache.containsKey(columnName))
+            return cache.get(columnName);
+
+        for (ColumnConfig cConfig : columnConfigs) {
+            if (cConfig.getColumnName().toLowerCase().equals(columnName)) {
+                cache.put(columnName, cConfig);
+                return cConfig;
+            }
+        }
+        cache.put(columnName, null);
+        return null;
+    }
+
+    public static GenConfig loadConfig(InputStream stream) {
+        try {
+            GenConfig config = JsonUtil.readValue(stream, GenConfig.class);
+            return config;
+        } catch (JsonMappingException e) {
+            e.printStackTrace();
+        } catch (JsonParseException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/estimation/CubeSizeEstimationCLI.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/estimation/CubeSizeEstimationCLI.java b/cube/src/main/java/com/kylinolap/cube/estimation/CubeSizeEstimationCLI.java
new file mode 100644
index 0000000..494f1b0
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/estimation/CubeSizeEstimationCLI.java
@@ -0,0 +1,163 @@
+package com.kylinolap.cube.estimation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.cuboid.Cuboid;
+import com.kylinolap.cube.cuboid.CuboidScheduler;
+import com.kylinolap.metadata.model.cube.CubeDesc;
+import com.kylinolap.metadata.model.cube.DimensionDesc;
+import com.kylinolap.metadata.model.cube.HierarchyDesc;
+import com.kylinolap.metadata.model.cube.MeasureDesc;
+import com.kylinolap.metadata.model.cube.RowKeyColDesc;
+import com.kylinolap.metadata.model.cube.RowKeyDesc;
+import com.kylinolap.metadata.model.schema.DataType;
+
+/**
+ * Created by honma on 9/1/14.
+ */
+public class CubeSizeEstimationCLI {
+
+    public static class RowKeyColInfo {
+        public List<List<Integer>> hierachyColBitIndice;
+        public List<Integer> nonHierachyColBitIndice;
+    }
+
+    public static long estimatedCubeSize(String cubeName, long[] cardinality) {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+        CubeDesc cubeDesc = cubeInstance.getDescriptor();
+
+        CuboidScheduler scheduler = new CuboidScheduler(cubeDesc);
+        long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc);
+        LinkedList<Long> cuboidQueue = new LinkedList<Long>();
+        cuboidQueue.push(baseCuboid);
+
+        long totalSpace = 0;
+
+        while (!cuboidQueue.isEmpty()) {
+            long cuboidID = cuboidQueue.pop();
+            Collection<Long> spanningCuboid = scheduler.getSpanningCuboid(cuboidID);
+            for (Long sc : spanningCuboid) {
+                cuboidQueue.push(sc);
+            }
+
+            totalSpace += estimateCuboidSpace(cuboidID, cardinality, cubeDesc);
+        }
+        return totalSpace;
+    }
+
+    public static long estimateCuboidSpace(long cuboidID, long[] cardinality, CubeDesc cubeDesc) {
+
+        RowKeyColInfo rowKeyColInfo = extractRowKeyInfo(cubeDesc);
+        RowKeyDesc rowKeyDesc = cubeDesc.getRowkey();
+
+        long rowCount = 1;
+        int[] rowKeySpaces = estimateRowKeyColSpace(rowKeyDesc, cardinality);
+        int dimensionSpace = 0;
+        int measureSpace = getMeasureSpace(cubeDesc);
+
+        for (List<Integer> hlist : rowKeyColInfo.hierachyColBitIndice) {
+            // for hierachy columns, the cardinality of the most detailed column
+            // nominates.
+            int i;
+            for (i = 0; i < hlist.size() && rowKeyColExists(hlist.get(i), cuboidID); ++i) {
+                dimensionSpace += rowKeySpaces[hlist.get(i)];
+            }
+
+            if (i != 0)
+                rowCount *= cardinality[hlist.get(i - 1)];
+        }
+
+        for (int index : rowKeyColInfo.nonHierachyColBitIndice) {
+            if (rowKeyColExists(index, cuboidID)) {
+                rowCount *= cardinality[index];
+                dimensionSpace += rowKeySpaces[index];
+            }
+        }
+        return rowCount * (dimensionSpace + measureSpace);
+    }
+
+    private static int[] estimateRowKeyColSpace(RowKeyDesc rowKeyDesc, long[] cardinality) {
+        RowKeyColDesc[] rowKeyColDescs = rowKeyDesc.getRowKeyColumns();
+        int[] ret = new int[rowKeyColDescs.length];
+        for (int i = 0; i < rowKeyColDescs.length; ++i) {
+            RowKeyColDesc rowKeyColDesc = rowKeyColDescs[rowKeyColDescs.length - 1 - i];
+            if (rowKeyColDesc.getDictionary() == null) {
+                if (rowKeyColDesc.getLength() == 0)
+                    throw new IllegalStateException("The non-dictionary col " + rowKeyColDesc.getColumn() + " has length of 0");
+                ret[i] = rowKeyColDesc.getLength();
+            } else {
+                ret[i] = estimateDictionaryColSpace(cardinality[i]);
+            }
+        }
+        return ret;
+    }
+
+    // TODO what if it's date dictionary?
+    private static int estimateDictionaryColSpace(long cardinality) {
+        long mask = 1L;
+        int i;
+        for (i = Long.SIZE - 1; i >= 0; i--) {
+            if ((cardinality & (mask << i)) != 0) {
+                break;
+            }
+        }
+
+        if (i < 0)
+            throw new IllegalStateException("the cardinality is 0");
+
+        return ((i + 1) + 7) / 8;// the bytes required to save at most
+                                 // cardinality numbers
+    }
+
+    private static int getMeasureSpace(CubeDesc cubeDesc) {
+        int space = 0;
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            DataType returnType = measureDesc.getFunction().getReturnDataType();
+            space += returnType.getSpaceEstimate();
+        }
+        return space;
+    }
+
+    private static boolean rowKeyColExists(int bitIndex, long cuboidID) {
+        long mask = 1L << bitIndex;
+        return (cuboidID & mask) != 0;
+    }
+
+    private static RowKeyColInfo extractRowKeyInfo(CubeDesc cubeDesc) {
+        RowKeyDesc rowKeyDesc = cubeDesc.getRowkey();
+        RowKeyColInfo info = new RowKeyColInfo();
+        info.hierachyColBitIndice = new ArrayList<List<Integer>>();
+        info.nonHierachyColBitIndice = new ArrayList<Integer>();
+        HashSet<Integer> heirachyIndexSet = new HashSet<Integer>();
+
+        for (DimensionDesc dim : cubeDesc.getDimensions()) {
+            if (dim.getHierarchy() != null) {
+                LinkedList<Integer> hlist = new LinkedList<Integer>();
+                for (HierarchyDesc hierarchyDesc : dim.getHierarchy()) {
+                    int index = rowKeyDesc.getColumnBitIndex(hierarchyDesc.getColumnRef());
+                    hlist.add(index);
+                    heirachyIndexSet.add(index);
+                }
+                info.hierachyColBitIndice.add(hlist);
+            }
+        }
+
+        for (int i = 0; i < rowKeyDesc.getRowKeyColumns().length; ++i) {
+            if (!heirachyIndexSet.contains(i)) {
+                info.nonHierachyColBitIndice.add(i);
+            }
+        }
+
+        return info;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/exception/CubeIntegrityException.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/exception/CubeIntegrityException.java b/cube/src/main/java/com/kylinolap/cube/exception/CubeIntegrityException.java
new file mode 100644
index 0000000..a0ab338
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/exception/CubeIntegrityException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.exception;
+
+/**
+ * @author xduo
+ * 
+ */
+public class CubeIntegrityException extends Exception {
+    /**
+     * @param string
+     */
+    public CubeIntegrityException(String string) {
+        super(string);
+    }
+
+    private static final long serialVersionUID = -7924187859607404390L;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/BitMapContainer.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/BitMapContainer.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/BitMapContainer.java
new file mode 100644
index 0000000..f1af35e
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/BitMapContainer.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.kylinolap.common.util.BytesUtil;
+import com.kylinolap.dict.Dictionary;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BitMapContainer implements ColumnValueContainer {
+
+    int valueLen;
+    int nValues;
+    int size;
+    ConciseSet[] sets;
+    boolean closedForChange;
+    
+    transient byte[] temp;
+
+    public BitMapContainer(TableRecordInfoDigest info, int col) {
+        this.valueLen = info.length(col);
+        this.size = 0;
+        this.nValues = info.getMaxID(col) + 1;
+        this.sets = null;
+        this.closedForChange = false;
+        
+        this.temp = new byte[valueLen];
+    }
+
+    @Override
+    public void append(ImmutableBytesWritable valueBytes) {
+        int value = BytesUtil.readUnsigned(valueBytes.get(), valueBytes.getOffset(), valueLen);
+        append(value);
+    }
+
+    public void append(int value) {
+        checkUpdateMode();
+        if (value == Dictionary.NULL_ID[valueLen]) {
+            value = nValues; // set[nValues] holds NULL
+        }
+        sets[value].add(size);
+        size++;
+    }
+    
+    @Override
+    public void getValueAt(int i, ImmutableBytesWritable valueBytes) {
+        int value = getValueIntAt(i);
+        BytesUtil.writeUnsigned(value, temp, 0, valueLen);
+        valueBytes.set(temp, 0, valueLen);
+    }
+    
+    public int getValueIntAt(int i) {
+        for (int v = 0; v < nValues; v++) {
+            if (sets[v].contains(i)) {
+                return v;
+            }
+        }
+        // if v is not in [0..nValues-1], then it must be nValue (NULL)
+        return Dictionary.NULL_ID[valueLen];
+    }
+
+    private void checkUpdateMode() {
+        if (isClosedForChange()) {
+            throw new IllegalStateException();
+        }
+        if (sets == null) {
+            sets = new ConciseSet[nValues + 1];
+            for (int i = 0; i <= nValues; i++) {
+                sets[i] = new ConciseSet();
+            }
+        }
+    }
+
+    private boolean isClosedForChange() {
+        return closedForChange;
+    }
+
+    @Override
+    public void closeForChange() {
+        closedForChange = true;
+    }
+
+    @Override
+    public int getSize() {
+        return size;
+    }
+
+    public List<ImmutableBytesWritable> toBytes() {
+        if (isClosedForChange() == false)
+            closeForChange();
+
+        List<ImmutableBytesWritable> r = new ArrayList<ImmutableBytesWritable>(nValues + 1);
+        for (int i = 0; i <= nValues; i++) {
+            r.add(setToBytes(sets[i]));
+        }
+        return r;
+    }
+
+    public void fromBytes(List<ImmutableBytesWritable> bytes) {
+        assert nValues + 1 == bytes.size();
+        sets = new ConciseSet[nValues + 1];
+        size = 0;
+        for (int i = 0; i <= nValues; i++) {
+            sets[i] = bytesToSet(bytes.get(i));
+            size += sets[i].size();
+        }
+        closedForChange = true;
+    }
+
+    private ImmutableBytesWritable setToBytes(ConciseSet set) {
+        byte[] array;
+        if (set.isEmpty()) // ConciseSet.toByteBuffer() throws exception when set is empty
+            array = BytesUtil.EMPTY_BYTE_ARRAY;
+        else
+            array = set.toByteBuffer().array();
+        return new ImmutableBytesWritable(array);
+    }
+
+    private ConciseSet bytesToSet(ImmutableBytesWritable bytes) {
+        if (bytes.get() == null || bytes.getLength() == 0) {
+            return new ConciseSet();
+        } else {
+            IntBuffer intBuffer = ByteBuffer.wrap(bytes.get(), bytes.getOffset(), bytes.getLength()).asIntBuffer();
+            int[] words = new int[intBuffer.capacity()];
+            intBuffer.get(words);
+            return new ConciseSet(words, false);
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (closedForChange ? 1231 : 1237);
+        result = prime * result + nValues;
+        result = prime * result + Arrays.hashCode(sets);
+        result = prime * result + size;
+        result = prime * result + valueLen;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        BitMapContainer other = (BitMapContainer) obj;
+        if (closedForChange != other.closedForChange)
+            return false;
+        if (nValues != other.nValues)
+            return false;
+        if (!Arrays.equals(sets, other.sets))
+            return false;
+        if (size != other.size)
+            return false;
+        if (valueLen != other.valueLen)
+            return false;
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/ColumnValueContainer.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ColumnValueContainer.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/ColumnValueContainer.java
new file mode 100644
index 0000000..a2ac4c3
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/ColumnValueContainer.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * @author yangli9
+ * 
+ */
+public interface ColumnValueContainer {
+
+    void append(ImmutableBytesWritable valueBytes);
+
+    void closeForChange();
+
+    int getSize();
+
+    // works only after closeForChange()
+    void getValueAt(int i, ImmutableBytesWritable valueBytes);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/CompressedValueContainer.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/CompressedValueContainer.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/CompressedValueContainer.java
new file mode 100644
index 0000000..6874e8f
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/CompressedValueContainer.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.kylinolap.common.util.BytesUtil;
+import com.ning.compress.lzf.LZFDecoder;
+import com.ning.compress.lzf.LZFEncoder;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class CompressedValueContainer implements ColumnValueContainer {
+    int valueLen;
+    int cap;
+    int size;
+    byte[] uncompressed;
+    byte[] compressed;
+
+    public CompressedValueContainer(TableRecordInfoDigest info, int col, int cap) {
+        this.valueLen = info.length(col);
+        this.cap = cap;
+        this.size = 0;
+        this.uncompressed = null;
+        this.compressed = null;
+    }
+
+    @Override
+    public void append(ImmutableBytesWritable valueBytes) {
+        checkUpdateMode();
+        System.arraycopy(valueBytes.get(), valueBytes.getOffset(), uncompressed, valueLen * size, valueLen);
+        size++;
+    }
+
+    @Override
+    public void getValueAt(int i, ImmutableBytesWritable valueBytes) {
+        valueBytes.set(uncompressed, valueLen * i, valueLen);
+    }
+
+    private void checkUpdateMode() {
+        if (isClosedForChange()) {
+            throw new IllegalArgumentException();
+        }
+        if (uncompressed == null) {
+            uncompressed = new byte[valueLen * cap];
+        }
+    }
+
+    private boolean isClosedForChange() {
+        return compressed != null;
+    }
+
+    @Override
+    public void closeForChange() {
+        checkUpdateMode();
+        try {
+            compressed = LZFEncoder.encode(uncompressed, 0, valueLen * size);
+        } catch (Exception e) {
+            throw new RuntimeException("LZF encode failure", e);
+        }
+    }
+
+    @Override
+    public int getSize() {
+        return size;
+    }
+
+    public ImmutableBytesWritable toBytes() {
+        if (isClosedForChange() == false)
+            closeForChange();
+        return new ImmutableBytesWritable(compressed);
+    }
+
+    public void fromBytes(ImmutableBytesWritable bytes) {
+        try {
+            uncompressed = LZFDecoder.decode(bytes.get(), bytes.getOffset(), bytes.getLength());
+        } catch (IOException e) {
+            throw new RuntimeException("LZF decode failure", e);
+        }
+        size = cap = uncompressed.length / valueLen;
+        compressed = BytesUtil.EMPTY_BYTE_ARRAY; // mark closed
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + size;
+        result = prime * result + valueLen;
+        result = prime * result + Arrays.hashCode(uncompressed);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        CompressedValueContainer other = (CompressedValueContainer) obj;
+        if (size != other.size)
+            return false;
+        if (valueLen != other.valueLen)
+            return false;
+        if (!Bytes.equals(uncompressed, 0, size * valueLen, uncompressed, 0, size * valueLen))
+            return false;
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/IIKeyValueCodec.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/IIKeyValueCodec.java
new file mode 100644
index 0000000..7e77136
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/IIKeyValueCodec.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import com.kylinolap.common.util.BytesUtil;
+
+/**
+ * @author yangli9
+ */
+public class IIKeyValueCodec {
+
+    public static final int SHARD_LEN = 2;
+    public static final int TIMEPART_LEN = 8;
+    public static final int COLNO_LEN = 2;
+
+    private TableRecordInfoDigest infoDigest;
+
+    public IIKeyValueCodec(TableRecordInfoDigest info) {
+        this.infoDigest = info;
+    }
+
+    public Collection<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> encodeKeyValue(Slice slice) {
+        ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result = Lists.newArrayList();
+        ColumnValueContainer[] containers = slice.containers;
+        for (int col = 0; col < containers.length; col++) {
+            if (containers[col] instanceof BitMapContainer) {
+                collectKeyValues(slice, col, (BitMapContainer) containers[col], result);
+            } else if (containers[col] instanceof CompressedValueContainer) {
+                collectKeyValues(slice, col, (CompressedValueContainer) containers[col], result);
+            } else {
+                throw new IllegalArgumentException("Unkown container class " + containers[col].getClass());
+            }
+        }
+        return result;
+    }
+
+    private void collectKeyValues(Slice slice, int col, CompressedValueContainer container, //
+            ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result) {
+        ImmutableBytesWritable key = encodeKey(slice.getShard(), slice.getTimestamp(), col, -1);
+        ImmutableBytesWritable value = container.toBytes();
+        result.add(new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, value));
+    }
+
+    private void collectKeyValues(Slice slice, int col, BitMapContainer container, //
+            ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result) {
+        List<ImmutableBytesWritable> values = container.toBytes();
+        for (int v = 0; v < values.size(); v++) {
+            ImmutableBytesWritable key = encodeKey(slice.getShard(), slice.getTimestamp(), col, v);
+            result.add(new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, values.get(v)));
+        }
+    }
+
+    ImmutableBytesWritable encodeKey(short shard, long timestamp, int col, int colValue) {
+        byte[] bytes = new byte[20];
+        int len = encodeKey(shard, timestamp, col, colValue, bytes, 0);
+        return new ImmutableBytesWritable(bytes, 0, len);
+    }
+
+    int encodeKey(short shard, long timestamp, int col, int colValue, byte[] buf, int offset) {
+        int i = offset;
+
+        BytesUtil.writeUnsigned(shard, buf, i, SHARD_LEN);
+        i += SHARD_LEN;
+        BytesUtil.writeLong(timestamp, buf, i, TIMEPART_LEN);
+        i += TIMEPART_LEN;
+
+        BytesUtil.writeUnsigned(col, buf, i, COLNO_LEN);
+        i += COLNO_LEN;
+
+        if (colValue >= 0) {
+            int colLen = infoDigest.length(col);
+            BytesUtil.writeUnsigned(colValue, buf, i, colLen);
+            i += colLen;
+        }
+
+        return i - offset;
+    }
+
+    public Iterable<Slice> decodeKeyValue(Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
+        return new Decoder(infoDigest, kvs);
+    }
+
+    private static class Decoder implements Iterable<Slice> {
+
+        TableRecordInfoDigest info;
+        Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator;
+
+        Slice next = null;
+        short curShard = Short.MIN_VALUE;
+        long curSliceTimestamp = Long.MIN_VALUE;
+        int curCol = -1;
+        int curColValue = -1;
+        short lastShard = Short.MIN_VALUE;
+        long lastSliceTimestamp = Long.MIN_VALUE;
+        int lastCol = -1;
+        ColumnValueContainer[] containers = null;
+        List<ImmutableBytesWritable> bitMapValues = Lists.newArrayList();
+
+        Decoder(TableRecordInfoDigest info, Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
+            this.info = info;
+            this.iterator = kvs.iterator();
+        }
+
+        private void goToNext() {
+            if (next != null) { // was not fetched
+                return;
+            }
+
+            // NOTE the input keys are ordered
+            while (next == null && iterator.hasNext()) {
+                Pair<ImmutableBytesWritable, ImmutableBytesWritable> kv = iterator.next();
+                ImmutableBytesWritable k = kv.getFirst();
+                ImmutableBytesWritable v = kv.getSecond();
+                decodeKey(k);
+
+                if (curShard != lastShard || curSliceTimestamp != lastSliceTimestamp) {
+                    makeNext();
+                }
+                consumeCurrent(v);
+            }
+            if (next == null) {
+                makeNext();
+            }
+        }
+
+        private void decodeKey(ImmutableBytesWritable k) {
+            byte[] buf = k.get();
+            int i = k.getOffset();
+
+            curShard = (short) BytesUtil.readUnsigned(buf, i, SHARD_LEN);
+            i += SHARD_LEN;
+            curSliceTimestamp = BytesUtil.readLong(buf, i, TIMEPART_LEN);
+            i += TIMEPART_LEN;
+
+            curCol = BytesUtil.readUnsigned(buf, i, COLNO_LEN);
+            i += COLNO_LEN;
+
+            if (i - k.getOffset() < k.getLength()) {
+                //bitmap
+                int colLen = info.length(curCol);
+                curColValue = BytesUtil.readUnsigned(buf, i, colLen);
+                i += colLen;
+            } else {
+                //value list
+                curColValue = -1;
+            }
+        }
+
+        private void consumeCurrent(ImmutableBytesWritable v) {
+            if (curCol != lastCol && bitMapValues.size() > 0) { // end of a bitmap container
+                addBitMapContainer(lastCol);
+            }
+            if (curColValue < 0) {
+                CompressedValueContainer c = new CompressedValueContainer(info, curCol, 0);
+                c.fromBytes(v);
+                addContainer(curCol, c);
+            } else {
+                assert curColValue == bitMapValues.size();
+                // make a copy, the value object from caller is typically reused through iteration
+                bitMapValues.add(new ImmutableBytesWritable(v));
+            }
+
+            lastShard = curShard;
+            lastSliceTimestamp = curSliceTimestamp;
+            lastCol = curCol;
+        }
+
+        private void makeNext() {
+            if (bitMapValues.isEmpty() == false) {
+                addBitMapContainer(lastCol);
+            }
+            if (containers != null) {
+                next = new Slice(info, lastShard, lastSliceTimestamp, containers);
+            }
+            lastSliceTimestamp = Long.MIN_VALUE;
+            lastCol = -1;
+            containers = null;
+            bitMapValues.clear();
+        }
+
+        private void addBitMapContainer(int col) {
+            BitMapContainer c = new BitMapContainer(info, col);
+            c.fromBytes(bitMapValues);
+            addContainer(col, c);
+            bitMapValues.clear();
+        }
+
+        private void addContainer(int col, ColumnValueContainer c) {
+            if (containers == null) {
+                containers = new ColumnValueContainer[info.getColumnCount()];
+            }
+            containers[col] = c;
+        }
+
+        @Override
+        public Iterator<Slice> iterator() {
+            return new Iterator<Slice>() {
+                @Override
+                public boolean hasNext() {
+                    goToNext();
+                    return next != null;
+                }
+
+                @Override
+                public Slice next() {
+                    Slice result = next;
+                    next = null;
+                    return result;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/InvertedIndexCLI.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/InvertedIndexCLI.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/InvertedIndexCLI.java
new file mode 100644
index 0000000..059be73
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/InvertedIndexCLI.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.HadoopUtil;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class InvertedIndexCLI {
+
+    public static void main(String[] args) throws IOException {
+        Configuration hconf = HadoopUtil.getDefaultConfiguration();
+        CubeManager mgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+        String cubeName = args[0];
+        CubeInstance cube = mgr.getCube(cubeName);
+
+        String path = args[1];
+        System.out.println("Reading from " + path + " ...");
+
+        TableRecordInfo info = new TableRecordInfo(cube.getFirstSegment());
+        IIKeyValueCodec codec = new IIKeyValueCodec(info);
+        int count = 0;
+        for (Slice slice : codec.decodeKeyValue(readSequenceKVs(hconf, path))) {
+            for (TableRecordBytes rec : slice) {
+                System.out.println((TableRecord)rec);
+                count++;
+            }
+        }
+        System.out.println("Total " + count + " records");
+    }
+
+    public static Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> readSequenceKVs(Configuration hconf, String path) throws IOException {
+        final Reader reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
+        return new Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
+            @Override
+            public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
+                return new Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
+                    ImmutableBytesWritable k = new ImmutableBytesWritable();
+                    ImmutableBytesWritable v = new ImmutableBytesWritable();
+                    Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(k, v);
+
+                    @Override
+                    public boolean hasNext() {
+                        boolean hasNext = false;
+                        try {
+                            hasNext = reader.next(k, v);
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        } finally {
+                            if (hasNext == false) {
+                                IOUtils.closeQuietly(reader);
+                            }
+                        }
+                        return hasNext;
+                    }
+
+                    @Override
+                    public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
+                        return pair;
+                    }
+
+                    @Override
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingHash.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingHash.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingHash.java
new file mode 100644
index 0000000..3bd4603
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingHash.java
@@ -0,0 +1,15 @@
+package com.kylinolap.cube.invertedindex;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+public class ShardingHash {
+
+    static HashFunction hashFunc = Hashing.murmur3_128();
+    
+    public static long hashInt(int integer) {
+        return hashFunc.newHasher().putInt(integer).hash().asLong();
+    }
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingSliceBuilder.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingSliceBuilder.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingSliceBuilder.java
new file mode 100644
index 0000000..e9ffaf0
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/ShardingSliceBuilder.java
@@ -0,0 +1,35 @@
+package com.kylinolap.cube.invertedindex;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+public class ShardingSliceBuilder {
+
+    SliceBuilder[] builders;
+    
+    public ShardingSliceBuilder(TableRecordInfo info) {
+        int sharding = info.getDescriptor().getSharding();
+        builders = new SliceBuilder[sharding];
+        for (short i = 0; i < sharding; i++) {
+            builders[i] = new SliceBuilder(info, i);
+        }
+    }
+    
+    // NOTE: record must be appended in time order
+    public Slice append(TableRecord rec) {
+        short shard = rec.getShard();
+        return builders[shard].append(rec);
+    }
+    
+    public List<Slice> close() {
+        List<Slice> result = Lists.newArrayList();
+        for (SliceBuilder builder : builders) {
+            Slice slice = builder.close();
+            if (slice != null)
+                result.add(slice);
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/Slice.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/Slice.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/Slice.java
new file mode 100644
index 0000000..250ece4
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/Slice.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * Within a partition (per timestampGranularity), records are further sliced
+ * (per sliceLength) to fit into HBASE cell.
+ *
+ * @author yangli9
+ */
+public class Slice implements Iterable<TableRecordBytes>, Comparable<Slice> {
+
+    TableRecordInfoDigest info;
+    int nColumns;
+
+    short shard;
+    long timestamp;
+    int nRecords;
+    ColumnValueContainer[] containers;
+
+    Slice(TableRecordInfoDigest info, short shard, long timestamp, ColumnValueContainer[] containers) {
+        this.info = info;
+        this.nColumns = info.getColumnCount();
+
+        this.shard = shard;
+        this.timestamp = timestamp;
+        this.nRecords = containers[0].getSize();
+        this.containers = containers;
+
+        assert nColumns == containers.length;
+        for (int i = 0; i < nColumns; i++) {
+            assert nRecords == containers[i].getSize();
+        }
+    }
+
+    public short getShard() {
+        return shard;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    /**
+     * Standard iterator of Slice will return a iterator of TableRecordBytes,
+     * which cannot be printed/formated to readable text.
+     * By invoking this API client can avoid code like:
+     * <p/>
+     * for (TableRecordBytes rec : slice) {
+     * TableRecord realRecord = (TableRecord) rec.clone();
+     * }
+     * <p/>
+     * Note this API cannot be called simultaneously with iterator()
+     *
+     * @return
+     */
+    public Iterator<TableRecord> readableIterator() {
+        final Iterator<TableRecordBytes> innerIterator = iterator();
+
+        return new Iterator<TableRecord>() {
+
+
+            @Override
+            public boolean hasNext() {
+                return innerIterator.hasNext();
+            }
+
+            @Override
+            public TableRecord next() {
+                return (TableRecord) innerIterator.next();
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    @Override
+    public Iterator<TableRecordBytes> iterator() {
+        return new Iterator<TableRecordBytes>() {
+            int i = 0;
+            TableRecord rec = new TableRecord(info);
+            ImmutableBytesWritable temp = new ImmutableBytesWritable();
+
+            @Override
+            public boolean hasNext() {
+                return i < nRecords;
+            }
+
+            @Override
+            public TableRecordBytes next() {
+                for (int col = 0; col < nColumns; col++) {
+                    containers[col].getValueAt(i, temp);
+                    rec.setValueBytes(col, temp);
+                }
+                i++;
+                return rec;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+        };
+    }
+
+    /* (non-Javadoc)
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((info == null) ? 0 : info.hashCode());
+        result = prime * result + shard;
+        result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
+        return result;
+    }
+
+    /* (non-Javadoc)
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        Slice other = (Slice) obj;
+        if (info == null) {
+            if (other.info != null)
+                return false;
+        } else if (!info.equals(other.info))
+            return false;
+        if (shard != other.shard)
+            return false;
+        if (timestamp != other.timestamp)
+            return false;
+        return true;
+    }
+
+    @Override
+    public int compareTo(Slice o) {
+        int comp = this.shard - o.shard;
+        if (comp != 0)
+            return comp;
+
+        comp = (int) (this.timestamp - o.timestamp);
+        return comp;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/SliceBuilder.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/SliceBuilder.java
new file mode 100644
index 0000000..72523a8
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/SliceBuilder.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+
+/**
+ * @author yangli9
+ * 
+ */
+public class SliceBuilder {
+
+    TableRecordInfo info;
+    private int nColumns;
+    int nRecordsCap;
+
+    short shard;
+    long sliceTimestamp;
+    int nRecords;
+    private ColumnValueContainer[] containers;
+    
+    transient ImmutableBytesWritable temp = new ImmutableBytesWritable();
+
+    public SliceBuilder(TableRecordInfo info, short shard) {
+        this.info = info;
+        this.nColumns = info.getColumnCount();
+        this.nRecordsCap = Math.max(1, info.getDescriptor().getSliceSize());
+
+        this.shard = shard;
+        this.sliceTimestamp = Long.MIN_VALUE;
+        this.nRecords = 0;
+        this.containers = null;
+        
+        doneSlice(); // init containers
+    }
+
+    private Slice doneSlice() {
+        Slice r = null;
+        if (nRecords > 0) {
+            for (int i = 0; i < nColumns; i++) {
+                containers[i].closeForChange();
+            }
+            r = new Slice(info, shard, sliceTimestamp, containers);
+        }
+
+        // reset for next slice
+        nRecords = 0;
+        containers = new ColumnValueContainer[nColumns];
+        for (int i : info.getDescriptor().getBitmapColumns()) {
+            containers[i] = new BitMapContainer(info, i);
+        }
+        for (int i : info.getDescriptor().getValueColumns()) {
+            containers[i] = new CompressedValueContainer(info, i, nRecordsCap);
+        }
+        for (int i : info.getDescriptor().getMetricsColumns()) {
+            containers[i] = new CompressedValueContainer(info, i, nRecordsCap);
+        }
+
+        return r;
+
+    }
+
+    // NOTE: record must be appended in time order
+    public Slice append(TableRecord rec) {
+        if (rec.getShard() != shard)
+            throw new IllegalStateException();
+        
+        Slice doneSlice = null;
+        
+        if (isFull()) {
+            doneSlice = doneSlice();
+        }
+        
+        if (nRecords == 0) {
+            sliceTimestamp = increaseSliceTimestamp(rec.getTimestamp());
+        }
+
+        nRecords++;
+        for (int i = 0; i < nColumns; i++) {
+            rec.getValueBytes(i, temp);
+            containers[i].append(temp);
+        }
+
+        return doneSlice;
+    }
+
+    private long increaseSliceTimestamp(long timestamp) {
+        if (timestamp < sliceTimestamp)
+            throw new IllegalStateException();
+        
+        if (timestamp == sliceTimestamp)
+            return ++timestamp; // ensure slice timestamp increases
+        else
+            return timestamp;
+    }
+
+    public Slice close() {
+        Slice doneSlice = doneSlice();
+        this.sliceTimestamp = Long.MIN_VALUE;
+        this.nRecords = 0;
+        return doneSlice;
+    }
+
+    private boolean isFull() {
+        return nRecords >= nRecordsCap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecord.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecord.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecord.java
new file mode 100644
index 0000000..53601e2
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecord.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.cube.invertedindex;
+
+import com.kylinolap.dict.DateStrDictionary;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * @author yangli9
+ */
+public class TableRecord extends TableRecordBytes {
+
+
+    public TableRecord(TableRecordInfoDigest info) {
+        super();
+
+        if (info instanceof TableRecordInfo) {
+        } else {
+            throw new IllegalStateException("Table Record must be initialized with a TableRecordInfo");
+        }
+
+        this.info = info;
+        this.buf = new byte[info.byteFormLen];
+        reset();
+    }
+
+    public TableRecord(TableRecord another) {
+        super();
+
+        this.info = another.info;
+        this.buf = Bytes.copy(another.buf);
+    }
+
+    @Override
+    public Object clone() {
+        return new TableRecord(this);
+    }
+
+
+    public long getTimestamp() {
+        String str = getValueString(info().getTimestampColumn());
+        return DateStrDictionary.stringToMillis(str);
+    }
+
+    public int length(int col) {
+        return info.length(col);
+    }
+
+    public void setValueString(int col, String value) {
+        if (info().isMetrics(col)) {
+            LongWritable v = info().codec(col).valueOf(value);
+            setValueMetrics(col, v);
+        } else {
+            int id = info().dict(col).getIdFromValue(value);
+            setValueID(col, id);
+        }
+    }
+
+    public String getValueString(int col) {
+        if (info().isMetrics(col))
+            return info().codec(col).toString(getValueMetrics(col));
+        else
+            return info().dict(col).getValueFromId(getValueID(col));
+    }
+
+    public void setValueBytes(int col, ImmutableBytesWritable bytes) {
+        System.arraycopy(bytes.get(), bytes.getOffset(), buf, info.offset(col), info.length(col));
+    }
+    
+    public void getValueBytes(int col, ImmutableBytesWritable bytes) {
+        bytes.set(buf, info.offset(col), info.length(col));
+    }
+
+    private void setValueMetrics(int col, LongWritable value) {
+        info().codec(col).write(value, buf, info.offset(col));
+    }
+
+    private LongWritable getValueMetrics(int col) {
+        return info().codec(col).read(buf, info.offset(col));
+    }
+
+    public short getShard() {
+        int timestampID = getValueID(info().getTimestampColumn());
+        return (short) (Math.abs(ShardingHash.hashInt(timestampID)) % info().getDescriptor().getSharding());
+    }
+
+    public TableRecordInfo info() {
+        return (TableRecordInfo) info;
+    }
+
+
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder("[");
+        for (int col = 0; col < info.getColumnCount(); col++) {
+            if (col > 0)
+                buf.append(",");
+            buf.append(getValueString(col));
+        }
+        buf.append("]");
+        return buf.toString();
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordBytes.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordBytes.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordBytes.java
new file mode 100644
index 0000000..a40362a
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordBytes.java
@@ -0,0 +1,84 @@
+package com.kylinolap.cube.invertedindex;
+
+import com.kylinolap.common.util.BytesUtil;
+import com.kylinolap.dict.Dictionary;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Arrays;
+
+/**
+ * Created by honma on 11/10/14.
+ */
+public class TableRecordBytes implements Cloneable {
+    TableRecordInfoDigest info;
+    byte[] buf; // consecutive column value IDs (encoded by dictionary)
+
+    public TableRecordBytes() {
+    }
+
+    public void setValueID(int col, int id) {
+        BytesUtil.writeUnsigned(id, buf, info.offset(col), info.length(col));
+    }
+
+    public int getValueID(int col) {
+        return BytesUtil.readUnsigned(buf, info.offset(col), info.length(col));
+    }
+
+
+    public byte[] getBytes() {
+        return buf;
+    }
+
+    public void setBytes(byte[] bytes, int offset, int length) {
+        assert buf.length == length;
+        System.arraycopy(bytes, offset, buf, 0, length);
+    }
+
+    public void reset() {
+        Arrays.fill(buf, Dictionary.NULL);
+    }
+
+    public TableRecordBytes(TableRecordInfoDigest info) {
+        this.info = info;
+        this.buf = new byte[info.byteFormLen];
+        reset();
+    }
+
+    public TableRecordBytes(TableRecordBytes another) {
+        this.info = another.info;
+        this.buf = Bytes.copy(another.buf);
+    }
+
+    @Override
+    public Object clone() {
+        return new TableRecordBytes(this);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + Arrays.hashCode(buf);
+        result = prime * result + ((info == null) ? 0 : info.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        TableRecord other = (TableRecord) obj;
+        if (!Arrays.equals(buf, other.buf))
+            return false;
+        if (info == null) {
+            if (other.info != null)
+                return false;
+        } else if (!info.equals(other.info))
+            return false;
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordFactory.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordFactory.java b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordFactory.java
new file mode 100644
index 0000000..6c6e02d
--- /dev/null
+++ b/cube/src/main/java/com/kylinolap/cube/invertedindex/TableRecordFactory.java
@@ -0,0 +1,8 @@
+package com.kylinolap.cube.invertedindex;
+
+/**
+ * Created by honma on 11/10/14.
+ */
+public interface TableRecordFactory {
+    public TableRecordBytes createTableRecord();
+}