diff --git a/cube/src/main/java/com/kylinolap/cube/dataGen/ b/cube/src/main/java/com/kylinolap/cube/dataGen/
--- a/cube/src/main/java/com/kylinolap/cube/dataGen/
-package com.kylinolap.cube.dataGen;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.persistence.ResourceStore;
-import com.kylinolap.common.util.Array;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.metadata.MetadataManager;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.DimensionDesc;
-import com.kylinolap.metadata.model.cube.JoinDesc;
-import com.kylinolap.metadata.model.cube.MeasureDesc;
-import com.kylinolap.metadata.model.cube.TblColRef;
-import com.kylinolap.metadata.model.schema.ColumnDesc;
-import com.kylinolap.metadata.model.schema.DataType;
- * Created by hongbin on 5/20/14.
- */
-public class FactTableGenerator {
-    CubeInstance cube = null;
-    CubeDesc desc = null;
-    ResourceStore store = null;
-    String factTableName = null;
-    GenConfig genConf = null;
-    Random r = null;
-    String cubeName;
-    long randomSeed;
-    int rowCount;
-    int unlinkableRowCount;
-    int unlinkableRowCountMax;
-    double conflictRatio;
-    double linkableRatio;
-    // the names of lookup table columns which is in relation with fact
-    // table(appear as fk in fact table)
-    Hashtable<String, LinkedList<String>> lookupTableKeys = new Hashtable<String, LinkedList<String>>();
-    // possible values of lookupTableKeys, extracted from existing lookup
-    // tables.
-    // The key is in the format of tablename/columnname
-    HashMap<String, ArrayList<String>> feasibleValues = new HashMap<String, ArrayList<String>>();
-    // lookup table name -> sets of all composite keys
-    HashMap<String, HashSet<Array<String>>> lookupTableCompositeKeyValues = new HashMap<String, HashSet<Array<String>>>();
-    private void init(String cubeName, int rowCount, double conflictRaio, double linkableRatio, long randomSeed) {
-        this.rowCount = rowCount;
-        this.conflictRatio = conflictRaio;
-        this.cubeName = cubeName;
-        this.randomSeed = randomSeed;
-        this.linkableRatio = linkableRatio;
-        this.unlinkableRowCountMax = (int) (this.rowCount * (1 - linkableRatio));
-        this.unlinkableRowCount = 0;
-        r = new Random(randomSeed);
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        desc = cube.getDescriptor();
-        factTableName = desc.getFactTable();
-        store = ResourceStore.getStore(config);
-    }
-    /*
-     * users can specify the value preference for each column
-     */
-    private void loadConfig() {
-        try {
-            InputStream configStream = null;
-            configStream = store.getResource("/data/data_gen_config.json");
-            this.genConf = GenConfig.loadConfig(configStream);
-            if (configStream != null)
-                configStream.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-    private void loadLookupTableValues(String lookupTableName, LinkedList<String> columnNames, int distinctRowCount) throws Exception {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        // only deal with composite keys
-        if (columnNames.size() > 1 && !lookupTableCompositeKeyValues.containsKey(lookupTableName)) {
-            lookupTableCompositeKeyValues.put(lookupTableName, new HashSet<Array<String>>());
-        }
-        InputStream tableStream = null;
-        BufferedReader tableReader = null;
-        try {
-            Hashtable<String, Integer> zeroBasedInice = new Hashtable<String, Integer>();
-            for (String columnName : columnNames) {
-                ColumnDesc cDesc = MetadataManager.getInstance(config).getTableDesc(lookupTableName).findColumnByName(columnName);
-                zeroBasedInice.put(columnName, cDesc.getZeroBasedIndex());
-            }
-            String path = "/data/" + lookupTableName + ".csv";
-            tableStream = store.getResource(path);
-            tableReader = new BufferedReader(new InputStreamReader(tableStream));
-            tableReader.mark(0);
-            int rowCount = 0;
-            int curRowNum = 0;
-            String curRow;
-            while (tableReader.readLine() != null)
-                rowCount++;
-            HashSet<Integer> rows = new HashSet<Integer>();
-            distinctRowCount = (distinctRowCount < rowCount) ? distinctRowCount : rowCount;
-            while (rows.size() < distinctRowCount) {
-                rows.add(r.nextInt(rowCount));
-            }
-            // reopen the stream
-            tableStream.close();
-            tableReader.close();
-            tableStream = null;
-            tableReader = null;
-            tableStream = store.getResource(path);
-            tableReader = new BufferedReader(new InputStreamReader(tableStream));
-            while ((curRow = tableReader.readLine()) != null) {
-                if (rows.contains(curRowNum)) {
-                    String[] tokens = curRow.split(",");
-                    String[] comboKeys = null;
-                    int index = 0;
-                    if (columnNames.size() > 1)
-                        comboKeys = new String[columnNames.size()];
-                    for (String columnName : columnNames) {
-                        int zeroBasedIndex = zeroBasedInice.get(columnName);
-                        if (!feasibleValues.containsKey(lookupTableName + "/" + columnName))
-                            feasibleValues.put(lookupTableName + "/" + columnName, new ArrayList<String>());
-                        feasibleValues.get(lookupTableName + "/" + columnName).add(tokens[zeroBasedIndex]);
-                        if (columnNames.size() > 1) {
-                            comboKeys[index] = tokens[zeroBasedIndex];
-                            index++;
-                        }
-                    }
-                    if (columnNames.size() > 1) {
-                        Array<String> wrap = new Array<String>(comboKeys);
-                        if (lookupTableCompositeKeyValues.get(lookupTableName).contains(wrap)) {
-                            throw new Exception("The composite key already exist in the lookup table");
-                        }
-                        lookupTableCompositeKeyValues.get(lookupTableName).add(wrap);
-                    }
-                }
-                curRowNum++;
-            }
-            if (tableStream != null)
-                tableStream.close();
-            if (tableReader != null)
-                tableReader.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-            System.exit(1);
-        }
-    }
-    // prepare the candidate values for each joined column
-    private void prepare() throws Exception {
-        // load config
-        loadConfig();
-        HashSet<String> factTableColumns = new HashSet<String>();
-        for (DimensionDesc dim : desc.getDimensions()) {
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (col.getTable().equals(factTableName))
-                    factTableColumns.add(col.getName());
-            }
-            JoinDesc join = dim.getJoin();
-            if (join != null) {
-                String lookupTable = dim.getTable();
-                for (String column : join.getPrimaryKey()) {
-                    if (!lookupTableKeys.containsKey(lookupTable)) {
-                        lookupTableKeys.put(lookupTable, new LinkedList<String>());
-                    }
-                    if (!lookupTableKeys.get(lookupTable).contains(column))
-                        lookupTableKeys.get(lookupTable).add(column);
-                }
-            }
-        }
-        int distinctRowCount = (int) (this.rowCount / this.conflictRatio);
-        distinctRowCount = (distinctRowCount == 0) ? 1 : distinctRowCount;
-        // lookup tables
-        for (String lookupTable : lookupTableKeys.keySet()) {
-            this.loadLookupTableValues(lookupTable, lookupTableKeys.get(lookupTable), distinctRowCount);
-        }
-    }
-    private List<DimensionDesc> getSortedDimentsionDescs() {
-        List<DimensionDesc> dimensions = desc.getDimensions();
-        Collections.sort(dimensions, new Comparator<DimensionDesc>() {
-            @Override
-            public int compare(DimensionDesc o1, DimensionDesc o2) {
-                JoinDesc j1 = o2.getJoin();
-                JoinDesc j2 = o1.getJoin();
-                return Integer.valueOf(j1 != null ? j1.getPrimaryKey().length : 0).compareTo(j2 != null ? j2.getPrimaryKey().length : 0);
-            }
-        });
-        return dimensions;
-    }
-    private void execute(String joinType) throws Exception {
-        // main logic here , generate the data to a temp file
-        String tempFilePath = generate();
-        // Write to hbase
-        File tempFile = new File(tempFilePath);
-        InputStream in = new FileInputStream(tempFile);
-        String factTablePath = "/data/" + factTableName + ".csv";
-        store.deleteResource(factTablePath);
-        store.putResource(factTablePath, in, System.currentTimeMillis());
-        in.close();
-        // duplicate a copy of this fact table, with a naming convention with
-        // jointype added
-        // so that later test cases can select different data files
-        in = new FileInputStream(tempFile);
-        String factTablePathWithJoinType = "/data/" + factTableName + ".csv." + joinType.toLowerCase();
-        store.deleteResource(factTablePathWithJoinType);
-        store.putResource(factTablePathWithJoinType, in, System.currentTimeMillis());
-        in.close();
-        tempFile.delete();
-        System.out.println();
-        System.out.println("The new fact table has been written to $KYLIN_METADATA_URL" + factTablePath);
-        System.out.println();
-    }
-    /**
-     * Generate the fact table and put it into a temp file
-     *
-     * @return
-     * @throws Exception
-     */
-    private String generate() throws Exception {
-        // the columns on the fact table can be classified into three groups:
-        // 1. foreign keys
-        HashMap<String, String> factTableCol2LookupCol = new HashMap<String, String>();
-        // 2. metrics or directly used dimensions
-        HashSet<String> usedCols = new HashSet<String>();
-        // 3. others, not referenced anywhere
-        HashMap<String, String> lookupCol2factTableCol = new HashMap<String, String>();
-        // find fact table columns in fks
-        List<DimensionDesc> dimensions = getSortedDimentsionDescs();
-        for (DimensionDesc dim : dimensions) {
-            JoinDesc jDesc = dim.getJoin();
-            if (jDesc != null) {
-                String[] fks = jDesc.getForeignKey();
-                String[] pks = jDesc.getPrimaryKey();
-                int num = fks.length;
-                for (int i = 0; i < num; ++i) {
-                    String value = dim.getTable() + "/" + pks[i];
-                    lookupCol2factTableCol.put(value, fks[i]);
-                    if (factTableCol2LookupCol.containsKey(fks[i])) {
-                        if (!factTableCol2LookupCol.get(fks[i]).equals(value)) {
-                            System.out.println("Warning: Disambiguation on the mapping of column " + fks[i] + ", " + factTableCol2LookupCol.get(fks[i]) + "(chosen) or " + value);
-                            continue;
-                        }
-                    }
-                    factTableCol2LookupCol.put(fks[i], value);
-                }
-            }
-            //else, deal with it in next roung
-        }
-        // find fact table columns in direct dimension
-        // DO NOT merge this with the previous loop
-        for (DimensionDesc dim : dimensions) {
-            JoinDesc jDesc = dim.getJoin();
-            if (jDesc == null) {
-                // column on fact table used directly as a dimension
-                if (!factTableCol2LookupCol.containsKey(dim.getColumn()))
-                    usedCols.add(dim.getColumn());
-            }
-        }
-        // find fact table columns in measures
-        for (MeasureDesc mDesc : desc.getMeasures()) {
-            List<TblColRef> pcols = mDesc.getFunction().getParameter().getColRefs();
-            if (pcols != null) {
-                for (TblColRef col : pcols) {
-                    if (!factTableCol2LookupCol.containsKey(col.getName()))
-                        usedCols.add(col.getName());
-                }
-            }
-        }
-        return createTable(this.rowCount, factTableCol2LookupCol, lookupCol2factTableCol, usedCols);
-    }
-    private String normToTwoDigits(int v) {
-        if (v < 10)
-            return "0" + v;
-        else
-            return Integer.toString(v);
-    }
-    private String randomPick(ArrayList<String> candidates) {
-        int index = r.nextInt(candidates.size());
-        return candidates.get(index);
-    }
-    private String createRandomCell(ColumnDesc cDesc, ArrayList<String> range) throws Exception {
-        DataType type = cDesc.getType();
-        if (type.isStringFamily()) {
-            throw new Exception("Can't handle range values for string");
-        } else if (type.isIntegerFamily()) {
-            int low = Integer.parseInt(range.get(0));
-            int high = Integer.parseInt(range.get(1));
-            return Integer.toString(r.nextInt(high - low) + low);
-        } else if (type.isDouble()) {
-            double low = Double.parseDouble(range.get(0));
-            double high = Double.parseDouble(range.get(1));
-            return String.format("%.4f", r.nextDouble() * (high - low) + low);
-        } else if (type.isFloat()) {
-            float low = Float.parseFloat(range.get(0));
-            float high = Float.parseFloat(range.get(1));
-            return String.format("%.4f", r.nextFloat() * (high - low) + low);
-        } else if (type.isDecimal()) {
-            double low = Double.parseDouble(range.get(0));
-            double high = Double.parseDouble(range.get(1));
-            return String.format("%.4f", r.nextDouble() * (high - low) + low);
-        } else if (type.isDateTimeFamily()) {
-            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-            Date start = format.parse(range.get(0));
-            Date end = format.parse(range.get(1));
-            long diff = end.getTime() - start.getTime();
-            Date temp = new Date(start.getTime() + (long) (diff * r.nextDouble()));
-            Calendar cal = Calendar.getInstance();
-            cal.setTime(temp);
-            // first day
-            cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
-            return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
-        } else {
-            System.out.println("The data type " + type + "is not recognized");
-            System.exit(1);
-        }
-        return null;
-    }
-    private String createRandomCell(ColumnDesc cDesc) {
-        String type = cDesc.getTypeName();
-        String s = type.toLowerCase();
-        if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
-            StringBuilder sb = new StringBuilder();
-            for (int i = 0; i < 2; i++) {
-                sb.append((char) ('a' + r.nextInt(10)));// there are 10*10
-                // possible strings
-            }
-            return sb.toString();
-        } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
-            return Integer.toString(r.nextInt(128));
-        } else if (s.equals("double")) {
-            return String.format("%.4f", r.nextDouble() * 100);
-        } else if (s.equals("float")) {
-            return String.format("%.4f", r.nextFloat() * 100);
-        } else if (s.equals("decimal")) {
-            return String.format("%.4f", r.nextDouble() * 100);
-        } else if (s.equals("date")) {
-            long date20131231 = 61349312153265L;
-            long date20010101 = 60939158400000L;
-            long diff = date20131231 - date20010101;
-            Date temp = new Date(date20010101 + (long) (diff * r.nextDouble()));
-            Calendar cal = Calendar.getInstance();
-            cal.setTime(temp);
-            // first day
-            cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
-            return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
-        } else {
-            System.out.println("The data type " + type + "is not recognized");
-            System.exit(1);
-        }
-        return null;
-    }
-    private String createDefaultsCell(String type) {
-        String s = type.toLowerCase();
-        if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
-            return "abcde";
-        } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
-            return "0";
-        } else if (s.equals("double")) {
-            return "0";
-        } else if (s.equals("float")) {
-            return "0";
-        } else if (s.equals("decimal")) {
-            return "0";
-        } else if (s.equals("date")) {
-            return "1970-01-01";
-        } else {
-            System.out.println("The data type " + type + "is not recognized");
-            System.exit(1);
-        }
-        return null;
-    }
-    private void printColumnMappings(HashMap<String, String> factTableCol2LookupCol, HashSet<String> usedCols, HashSet<String> defaultColumns) {
-        System.out.println("=======================================================================");
-        System.out.format("%-30s %s", "FACT_TABLE_COLUMN", "MAPPING");
-        System.out.println();
-        System.out.println();
-        for (Map.Entry<String, String> entry : factTableCol2LookupCol.entrySet()) {
-            System.out.format("%-30s %s", entry.getKey(), entry.getValue());
-            System.out.println();
-        }
-        for (String key : usedCols) {
-            System.out.format("%-30s %s", key, "Random Values");
-            System.out.println();
-        }
-        for (String key : defaultColumns) {
-            System.out.format("%-30s %s", key, "Default Values");
-            System.out.println();
-        }
-        System.out.println("=======================================================================");
-        System.out.println("Parameters:");
-        System.out.println();
-        System.out.println("CubeName:        " + cubeName);
-        System.out.println("RowCount:        " + rowCount);
-        System.out.println("ConflictRatio:   " + conflictRatio);
-        System.out.println("LinkableRatio:   " + linkableRatio);
-        System.out.println("Seed:            " + randomSeed);
-        System.out.println();
-        System.out.println("The number of actual unlinkable fact rows is: " + this.unlinkableRowCount);
-        System.out.println("You can vary the above parameters to generate different datasets.");
-        System.out.println();
-    }
-    // Any row in the column must finally appear in the flatten big table.
-    // for single-column joins the generated row is guaranteed to have a match
-    // in lookup table
-    // for composite keys we'll need an extra check
-    private boolean matchAllCompositeKeys(HashMap<String, String> lookupCol2FactTableCol, LinkedList<String> columnValues) {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        for (String lookupTable : lookupTableKeys.keySet()) {
-            if (lookupTableKeys.get(lookupTable).size() == 1)
-                continue;
-            String[] comboKey = new String[lookupTableKeys.get(lookupTable).size()];
-            int index = 0;
-            for (String column : lookupTableKeys.get(lookupTable)) {
-                String key = lookupTable + "/" + column;
-                String factTableCol = lookupCol2FactTableCol.get(key);
-                int cardinal = MetadataManager.getInstance(config).getTableDesc(factTableName).findColumnByName(factTableCol).getZeroBasedIndex();
-                comboKey[index] = columnValues.get(cardinal);
-                index++;
-            }
-            Array<String> wrap = new Array<String>(comboKey);
-            if (!lookupTableCompositeKeyValues.get(lookupTable).contains(wrap)) {
-                // System.out.println("Try " + wrap + " Failed, continue...");
-                return false;
-            }
-        }
-        return true;
-    }
-    private String createCell(ColumnDesc cDesc) throws Exception {
-        ColumnConfig cConfig = null;
-        if ((cConfig = genConf.getColumnConfigByName(cDesc.getName())) == null) {
-            // if the column is not configured, use random values
-            return (createRandomCell(cDesc));
-        } else {
-            // the column has a configuration
-            if (!cConfig.isAsRange() && !cConfig.isExclusive() && r.nextBoolean()) {
-                // if the column still allows random values
-                return (createRandomCell(cDesc));
-            } else {
-                // use specified values
-                ArrayList<String> valueSet = cConfig.getValueSet();
-                if (valueSet == null || valueSet.size() == 0)
-                    throw new Exception("Did you forget to specify value set for " + cDesc.getName());
-                if (!cConfig.isAsRange()) {
-                    return (randomPick(valueSet));
-                } else {
-                    if (valueSet.size() != 2)
-                        throw new Exception("Only two values can be set for range values, the column: " + cDesc.getName());
-                    return (createRandomCell(cDesc, valueSet));
-                }
-            }
-        }
-    }
-    private LinkedList<String> createRow(HashMap<String, String> factTableCol2LookupCol, HashSet<String> usedCols, HashSet<String> defaultColumns) throws Exception {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        LinkedList<String> columnValues = new LinkedList<String>();
-        for (ColumnDesc cDesc : MetadataManager.getInstance(config).getTableDesc(factTableName).getColumns()) {
-            String colName = cDesc.getName();
-            if (factTableCol2LookupCol.containsKey(colName)) {
-                // if the current column is a fk column in fact table
-                ArrayList<String> candidates = this.feasibleValues.get(factTableCol2LookupCol.get(colName));
-                columnValues.add(candidates.get(r.nextInt(candidates.size())));
-            } else if (usedCols.contains(colName)) {
-                // if the current column is a metric column in fact table
-                columnValues.add(createCell(cDesc));
-            } else {
-                // otherwise this column is not useful in OLAP
-                columnValues.add(createDefaultsCell(cDesc.getTypeName()));
-                defaultColumns.add(colName);
-            }
-        }
-        return columnValues;
-    }
-    private String createTable(int rowCount, HashMap<String, String> factTableCol2LookupCol, HashMap<String, String> lookupCol2FactTableCol, HashSet<String> usedCols) throws Exception {
-        try {
-            File tempFile = File.createTempFile("ftg", ".tmp");
-            BufferedWriter writer = new BufferedWriter(new FileWriter(tempFile));
-            HashSet<String> defaultColumns = new HashSet<String>();
-            for (int i = 0; i < rowCount; ) {
-                LinkedList<String> columnValues = createRow(factTableCol2LookupCol, usedCols, defaultColumns);
-                if (!matchAllCompositeKeys(lookupCol2FactTableCol, columnValues)) {
-                    if (unlinkableRowCount < unlinkableRowCountMax) {
-                        unlinkableRowCount++;
-                    } else {
-                        continue;
-                    }
-                }
-                StringBuffer sb = new StringBuffer();
-                for (String c : columnValues)
-                    sb.append(c + ",");
-                sb.deleteCharAt(sb.length() - 1);
-                writer.write(sb.toString());
-                writer.newLine();
-                i++;
-                // System.out.println("Just generated the " + i + "th record");
-            }
-            writer.flush();
-            writer.close();
-            printColumnMappings(factTableCol2LookupCol, usedCols, defaultColumns);
-            return tempFile.getAbsolutePath();
-        } catch (IOException e) {
-            e.printStackTrace();
-            System.exit(1);
-        }
-        return null;
-    }
-    /**
-     * Randomly create a fact table and put it to test_kylin_data table in hbase
-     *
-     * @param cubeName      name of the cube
-     * @param rowCount      expected row count generated
-     * @param linkableRatio the percentage of fact table rows that can be linked with all
-     *                      lookup table by INNER join
-     * @param randomSeed    random seed
-     */
-    public static void generate(String cubeName, String rowCount, String linkableRatio, String randomSeed, String joinType) throws Exception {
-        if (cubeName == null)
-            cubeName = "test_kylin_cube_with_slr_ready";
-        if (rowCount == null)
-            rowCount = "10000";
-        if (linkableRatio == null)
-            linkableRatio = "0.6";
-        //if (randomSeed == null)
-        // don't give it value
-        // String conflictRatio = "5";//this parameter do not allow configuring
-        // any more
-        FactTableGenerator generator = new FactTableGenerator();
-        long seed;
-        if (randomSeed != null) {
-            seed = Long.parseLong(randomSeed);
-        } else {
-            Random r = new Random();
-            seed = r.nextLong();
-        }
-        generator.init(cubeName, Integer.parseInt(rowCount), 5, Double.parseDouble(linkableRatio), seed);
-        generator.prepare();
-        generator.execute(joinType);
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/dataGen/ b/cube/src/main/java/com/kylinolap/cube/dataGen/
--- a/cube/src/main/java/com/kylinolap/cube/dataGen/
-package com.kylinolap.cube.dataGen;
-import java.util.ArrayList;
-import java.util.HashMap;
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.kylinolap.common.util.JsonUtil;
- * Created by honma on 5/29/14.
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class GenConfig {
-    @JsonProperty("columnConfigs")
-    private ArrayList<ColumnConfig> columnConfigs;
-    private HashMap<String, ColumnConfig> cache = new HashMap<String, ColumnConfig>();
-    public ArrayList<ColumnConfig> getColumnConfigs() {
-        return columnConfigs;
-    }
-    public void setColumnConfigs(ArrayList<ColumnConfig> columnConfigs) {
-        this.columnConfigs = columnConfigs;
-    }
-    public ColumnConfig getColumnConfigByName(String columnName) {
-        columnName = columnName.toLowerCase();
-        if (cache.containsKey(columnName))
-            return cache.get(columnName);
-        for (ColumnConfig cConfig : columnConfigs) {
-            if (cConfig.getColumnName().toLowerCase().equals(columnName)) {
-                cache.put(columnName, cConfig);
-                return cConfig;
-            }
-        }
-        cache.put(columnName, null);
-        return null;
-    }
-    public static GenConfig loadConfig(InputStream stream) {
-        try {
-            GenConfig config = JsonUtil.readValue(stream, GenConfig.class);
-            return config;
-        } catch (JsonMappingException e) {
-            e.printStackTrace();
-        } catch (JsonParseException e) {
-            e.printStackTrace();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return null;
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/estimation/ b/cube/src/main/java/com/kylinolap/cube/estimation/
--- a/cube/src/main/java/com/kylinolap/cube/estimation/
-package com.kylinolap.cube.estimation;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.cuboid.Cuboid;
-import com.kylinolap.cube.cuboid.CuboidScheduler;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.DimensionDesc;
-import com.kylinolap.metadata.model.cube.HierarchyDesc;
-import com.kylinolap.metadata.model.cube.MeasureDesc;
-import com.kylinolap.metadata.model.cube.RowKeyColDesc;
-import com.kylinolap.metadata.model.cube.RowKeyDesc;
-import com.kylinolap.metadata.model.schema.DataType;
- * Created by honma on 9/1/14.
- */
-public class CubeSizeEstimationCLI {
-    public static class RowKeyColInfo {
-        public List<List<Integer>> hierachyColBitIndice;
-        public List<Integer> nonHierachyColBitIndice;
-    }
-    public static long estimatedCubeSize(String cubeName, long[] cardinality) {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        CubeManager cubeManager = CubeManager.getInstance(config);
-        CubeInstance cubeInstance = cubeManager.getCube(cubeName);
-        CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        CuboidScheduler scheduler = new CuboidScheduler(cubeDesc);
-        long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc);
-        LinkedList<Long> cuboidQueue = new LinkedList<Long>();
-        cuboidQueue.push(baseCuboid);
-        long totalSpace = 0;
-        while (!cuboidQueue.isEmpty()) {
-            long cuboidID = cuboidQueue.pop();
-            Collection<Long> spanningCuboid = scheduler.getSpanningCuboid(cuboidID);
-            for (Long sc : spanningCuboid) {
-                cuboidQueue.push(sc);
-            }
-            totalSpace += estimateCuboidSpace(cuboidID, cardinality, cubeDesc);
-        }
-        return totalSpace;
-    }
-    public static long estimateCuboidSpace(long cuboidID, long[] cardinality, CubeDesc cubeDesc) {
-        RowKeyColInfo rowKeyColInfo = extractRowKeyInfo(cubeDesc);
-        RowKeyDesc rowKeyDesc = cubeDesc.getRowkey();
-        long rowCount = 1;
-        int[] rowKeySpaces = estimateRowKeyColSpace(rowKeyDesc, cardinality);
-        int dimensionSpace = 0;
-        int measureSpace = getMeasureSpace(cubeDesc);
-        for (List<Integer> hlist : rowKeyColInfo.hierachyColBitIndice) {
-            // for hierachy columns, the cardinality of the most detailed column
-            // nominates.
-            int i;
-            for (i = 0; i < hlist.size() && rowKeyColExists(hlist.get(i), cuboidID); ++i) {
-                dimensionSpace += rowKeySpaces[hlist.get(i)];
-            }
-            if (i != 0)
-                rowCount *= cardinality[hlist.get(i - 1)];
-        }
-        for (int index : rowKeyColInfo.nonHierachyColBitIndice) {
-            if (rowKeyColExists(index, cuboidID)) {
-                rowCount *= cardinality[index];
-                dimensionSpace += rowKeySpaces[index];
-            }
-        }
-        return rowCount * (dimensionSpace + measureSpace);
-    }
-    private static int[] estimateRowKeyColSpace(RowKeyDesc rowKeyDesc, long[] cardinality) {
-        RowKeyColDesc[] rowKeyColDescs = rowKeyDesc.getRowKeyColumns();
-        int[] ret = new int[rowKeyColDescs.length];
-        for (int i = 0; i < rowKeyColDescs.length; ++i) {
-            RowKeyColDesc rowKeyColDesc = rowKeyColDescs[rowKeyColDescs.length - 1 - i];
-            if (rowKeyColDesc.getDictionary() == null) {
-                if (rowKeyColDesc.getLength() == 0)
-                    throw new IllegalStateException("The non-dictionary col " + rowKeyColDesc.getColumn() + " has length of 0");
-                ret[i] = rowKeyColDesc.getLength();
-            } else {
-                ret[i] = estimateDictionaryColSpace(cardinality[i]);
-            }
-        }
-        return ret;
-    }
-    // TODO what if it's date dictionary?
-    private static int estimateDictionaryColSpace(long cardinality) {
-        long mask = 1L;
-        int i;
-        for (i = Long.SIZE - 1; i >= 0; i--) {
-            if ((cardinality & (mask << i)) != 0) {
-                break;
-            }
-        }
-        if (i < 0)
-            throw new IllegalStateException("the cardinality is 0");
-        return ((i + 1) + 7) / 8;// the bytes required to save at most
-                                 // cardinality numbers
-    }
-    private static int getMeasureSpace(CubeDesc cubeDesc) {
-        int space = 0;
-        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
-            DataType returnType = measureDesc.getFunction().getReturnDataType();
-            space += returnType.getSpaceEstimate();
-        }
-        return space;
-    }
-    private static boolean rowKeyColExists(int bitIndex, long cuboidID) {
-        long mask = 1L << bitIndex;
-        return (cuboidID & mask) != 0;
-    }
-    private static RowKeyColInfo extractRowKeyInfo(CubeDesc cubeDesc) {
-        RowKeyDesc rowKeyDesc = cubeDesc.getRowkey();
-        RowKeyColInfo info = new RowKeyColInfo();
-        info.hierachyColBitIndice = new ArrayList<List<Integer>>();
-        info.nonHierachyColBitIndice = new ArrayList<Integer>();
-        HashSet<Integer> heirachyIndexSet = new HashSet<Integer>();
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            if (dim.getHierarchy() != null) {
-                LinkedList<Integer> hlist = new LinkedList<Integer>();
-                for (HierarchyDesc hierarchyDesc : dim.getHierarchy()) {
-                    int index = rowKeyDesc.getColumnBitIndex(hierarchyDesc.getColumnRef());
-                    hlist.add(index);
-                    heirachyIndexSet.add(index);
-                }
-                info.hierachyColBitIndice.add(hlist);
-            }
-        }
-        for (int i = 0; i < rowKeyDesc.getRowKeyColumns().length; ++i) {
-            if (!heirachyIndexSet.contains(i)) {
-                info.nonHierachyColBitIndice.add(i);
-            }
-        }
-        return info;
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/exception/ b/cube/src/main/java/com/kylinolap/cube/exception/
--- a/cube/src/main/java/com/kylinolap/cube/exception/
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.cube.exception;
- * @author xduo
- * 
- */
-public class CubeIntegrityException extends Exception {
-    /**
-     * @param string
-     */
-    public CubeIntegrityException(String string) {
-        super(string);
-    }
-    private static final long serialVersionUID = -7924187859607404390L;
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.cube.invertedindex;
-import it.uniroma3.mat.extendedset.intset.ConciseSet;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import com.kylinolap.common.util.BytesUtil;
-import com.kylinolap.dict.Dictionary;
- * @author yangli9
- * 
- */
-public class BitMapContainer implements ColumnValueContainer {
-    int valueLen;
-    int nValues;
-    int size;
-    ConciseSet[] sets;
-    boolean closedForChange;
-    transient byte[] temp;
-    public BitMapContainer(TableRecordInfoDigest info, int col) {
-        this.valueLen = info.length(col);
-        this.size = 0;
-        this.nValues = info.getMaxID(col) + 1;
-        this.sets = null;
-        this.closedForChange = false;
-        this.temp = new byte[valueLen];
-    }
-    @Override
-    public void append(ImmutableBytesWritable valueBytes) {
-        int value = BytesUtil.readUnsigned(valueBytes.get(), valueBytes.getOffset(), valueLen);
-        append(value);
-    }
-    public void append(int value) {
-        checkUpdateMode();
-        if (value == Dictionary.NULL_ID[valueLen]) {
-            value = nValues; // set[nValues] holds NULL
-        }
-        sets[value].add(size);
-        size++;
-    }
-    @Override
-    public void getValueAt(int i, ImmutableBytesWritable valueBytes) {
-        int value = getValueIntAt(i);
-        BytesUtil.writeUnsigned(value, temp, 0, valueLen);
-        valueBytes.set(temp, 0, valueLen);
-    }
-    public int getValueIntAt(int i) {
-        for (int v = 0; v < nValues; v++) {
-            if (sets[v].contains(i)) {
-                return v;
-            }
-        }
-        // if v is not in [0..nValues-1], then it must be nValue (NULL)
-        return Dictionary.NULL_ID[valueLen];
-    }
-    private void checkUpdateMode() {
-        if (isClosedForChange()) {
-            throw new IllegalStateException();
-        }
-        if (sets == null) {
-            sets = new ConciseSet[nValues + 1];
-            for (int i = 0; i <= nValues; i++) {
-                sets[i] = new ConciseSet();
-            }
-        }
-    }
-    private boolean isClosedForChange() {
-        return closedForChange;
-    }
-    @Override
-    public void closeForChange() {
-        closedForChange = true;
-    }
-    @Override
-    public int getSize() {
-        return size;
-    }
-    public List<ImmutableBytesWritable> toBytes() {
-        if (isClosedForChange() == false)
-            closeForChange();
-        List<ImmutableBytesWritable> r = new ArrayList<ImmutableBytesWritable>(nValues + 1);
-        for (int i = 0; i <= nValues; i++) {
-            r.add(setToBytes(sets[i]));
-        }
-        return r;
-    }
-    public void fromBytes(List<ImmutableBytesWritable> bytes) {
-        assert nValues + 1 == bytes.size();
-        sets = new ConciseSet[nValues + 1];
-        size = 0;
-        for (int i = 0; i <= nValues; i++) {
-            sets[i] = bytesToSet(bytes.get(i));
-            size += sets[i].size();
-        }
-        closedForChange = true;
-    }
-    private ImmutableBytesWritable setToBytes(ConciseSet set) {
-        byte[] array;
-        if (set.isEmpty()) // ConciseSet.toByteBuffer() throws exception when set is empty
-            array = BytesUtil.EMPTY_BYTE_ARRAY;
-        else
-            array = set.toByteBuffer().array();
-        return new ImmutableBytesWritable(array);
-    }
-    private ConciseSet bytesToSet(ImmutableBytesWritable bytes) {
-        if (bytes.get() == null || bytes.getLength() == 0) {
-            return new ConciseSet();
-        } else {
-            IntBuffer intBuffer = ByteBuffer.wrap(bytes.get(), bytes.getOffset(), bytes.getLength()).asIntBuffer();
-            int[] words = new int[intBuffer.capacity()];
-            intBuffer.get(words);
-            return new ConciseSet(words, false);
-        }
-    }
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + (closedForChange ? 1231 : 1237);
-        result = prime * result + nValues;
-        result = prime * result + Arrays.hashCode(sets);
-        result = prime * result + size;
-        result = prime * result + valueLen;
-        return result;
-    }
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        BitMapContainer other = (BitMapContainer) obj;
-        if (closedForChange != other.closedForChange)
-            return false;
-        if (nValues != other.nValues)
-            return false;
-        if (!Arrays.equals(sets, other.sets))
-            return false;
-        if (size != other.size)
-            return false;
-        if (valueLen != other.valueLen)
-            return false;
-        return true;
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.cube.invertedindex;
- * @author yangli9
- * 
- */
-public interface ColumnValueContainer {
-    void append(ImmutableBytesWritable valueBytes);
-    void closeForChange();
-    int getSize();
-    // works only after closeForChange()
-    void getValueAt(int i, ImmutableBytesWritable valueBytes);
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.cube.invertedindex;
-import java.util.Arrays;
-import org.apache.hadoop.hbase.util.Bytes;
-import com.kylinolap.common.util.BytesUtil;
-import com.ning.compress.lzf.LZFDecoder;
-import com.ning.compress.lzf.LZFEncoder;
- * @author yangli9
- * 
- */
-public class CompressedValueContainer implements ColumnValueContainer {
-    int valueLen;
-    int cap;
-    int size;
-    byte[] uncompressed;
-    byte[] compressed;
-    public CompressedValueContainer(TableRecordInfoDigest info, int col, int cap) {
-        this.valueLen = info.length(col);
-        this.cap = cap;
-        this.size = 0;
-        this.uncompressed = null;
-        this.compressed = null;
-    }
-    @Override
-    public void append(ImmutableBytesWritable valueBytes) {
-        checkUpdateMode();
-        System.arraycopy(valueBytes.get(), valueBytes.getOffset(), uncompressed, valueLen * size, valueLen);
-        size++;
-    }
-    @Override
-    public void getValueAt(int i, ImmutableBytesWritable valueBytes) {
-        valueBytes.set(uncompressed, valueLen * i, valueLen);
-    }
-    private void checkUpdateMode() {
-        if (isClosedForChange()) {
-            throw new IllegalArgumentException();
-        }
-        if (uncompressed == null) {
-            uncompressed = new byte[valueLen * cap];
-        }
-    }
-    private boolean isClosedForChange() {
-        return compressed != null;
-    }
-    @Override
-    public void closeForChange() {
-        checkUpdateMode();
-        try {
-            compressed = LZFEncoder.encode(uncompressed, 0, valueLen * size);
-        } catch (Exception e) {
-            throw new RuntimeException("LZF encode failure", e);
-        }
-    }
-    @Override
-    public int getSize() {
-        return size;
-    }
-    public ImmutableBytesWritable toBytes() {
-        if (isClosedForChange() == false)
-            closeForChange();
-        return new ImmutableBytesWritable(compressed);
-    }
-    public void fromBytes(ImmutableBytesWritable bytes) {
-        try {
-            uncompressed = LZFDecoder.decode(bytes.get(), bytes.getOffset(), bytes.getLength());
-        } catch (IOException e) {
-            throw new RuntimeException("LZF decode failure", e);
-        }
-        size = cap = uncompressed.length / valueLen;
-        compressed = BytesUtil.EMPTY_BYTE_ARRAY; // mark closed
-    }
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + size;
-        result = prime * result + valueLen;
-        result = prime * result + Arrays.hashCode(uncompressed);
-        return result;
-    }
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        CompressedValueContainer other = (CompressedValueContainer) obj;
-        if (size != other.size)
-            return false;
-        if (valueLen != other.valueLen)
-            return false;
-        if (!Bytes.equals(uncompressed, 0, size * valueLen, uncompressed, 0, size * valueLen))
-            return false;
-        return true;
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.cube.invertedindex;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.hadoop.hbase.util.Pair;
-import com.kylinolap.common.util.BytesUtil;
- * @author yangli9
- */
-public class IIKeyValueCodec {
-    public static final int SHARD_LEN = 2;
-    public static final int TIMEPART_LEN = 8;
-    public static final int COLNO_LEN = 2;
-    private TableRecordInfoDigest infoDigest;
-    public IIKeyValueCodec(TableRecordInfoDigest info) {
-        this.infoDigest = info;
-    }
-    public Collection<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> encodeKeyValue(Slice slice) {
-        ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result = Lists.newArrayList();
-        ColumnValueContainer[] containers = slice.containers;
-        for (int col = 0; col < containers.length; col++) {
-            if (containers[col] instanceof BitMapContainer) {
-                collectKeyValues(slice, col, (BitMapContainer) containers[col], result);
-            } else if (containers[col] instanceof CompressedValueContainer) {
-                collectKeyValues(slice, col, (CompressedValueContainer) containers[col], result);
-            } else {
-                throw new IllegalArgumentException("Unkown container class " + containers[col].getClass());
-            }
-        }
-        return result;
-    }
-    private void collectKeyValues(Slice slice, int col, CompressedValueContainer container, //
-            ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result) {
-        ImmutableBytesWritable key = encodeKey(slice.getShard(), slice.getTimestamp(), col, -1);
-        ImmutableBytesWritable value = container.toBytes();
-        result.add(new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, value));
-    }
-    private void collectKeyValues(Slice slice, int col, BitMapContainer container, //
-            ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result) {
-        List<ImmutableBytesWritable> values = container.toBytes();
-        for (int v = 0; v < values.size(); v++) {
-            ImmutableBytesWritable key = encodeKey(slice.getShard(), slice.getTimestamp(), col, v);
-            result.add(new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, values.get(v)));
-        }
-    }
-    ImmutableBytesWritable encodeKey(short shard, long timestamp, int col, int colValue) {
-        byte[] bytes = new byte[20];
-        int len = encodeKey(shard, timestamp, col, colValue, bytes, 0);
-        return new ImmutableBytesWritable(bytes, 0, len);
-    }
-    int encodeKey(short shard, long timestamp, int col, int colValue, byte[] buf, int offset) {
-        int i = offset;
-        BytesUtil.writeUnsigned(shard, buf, i, SHARD_LEN);
-        i += SHARD_LEN;
-        BytesUtil.writeLong(timestamp, buf, i, TIMEPART_LEN);
-        i += TIMEPART_LEN;
-        BytesUtil.writeUnsigned(col, buf, i, COLNO_LEN);
-        i += COLNO_LEN;
-        if (colValue >= 0) {
-            int colLen = infoDigest.length(col);
-            BytesUtil.writeUnsigned(colValue, buf, i, colLen);
-            i += colLen;
-        }
-        return i - offset;
-    }
-    public Iterable<Slice> decodeKeyValue(Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
-        return new Decoder(infoDigest, kvs);
-    }
-    private static class Decoder implements Iterable<Slice> {
-        TableRecordInfoDigest info;
-        Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator;
-        Slice next = null;
-        short curShard = Short.MIN_VALUE;
-        long curSliceTimestamp = Long.MIN_VALUE;
-        int curCol = -1;
-        int curColValue = -1;
-        short lastShard = Short.MIN_VALUE;
-        long lastSliceTimestamp = Long.MIN_VALUE;
-        int lastCol = -1;
-        ColumnValueContainer[] containers = null;
-        List<ImmutableBytesWritable> bitMapValues = Lists.newArrayList();
-        Decoder(TableRecordInfoDigest info, Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
-   = info;
-            this.iterator = kvs.iterator();
-        }
-        private void goToNext() {
-            if (next != null) { // was not fetched
-                return;
-            }
-            // NOTE the input keys are ordered
-            while (next == null && iterator.hasNext()) {
-                Pair<ImmutableBytesWritable, ImmutableBytesWritable> kv =;
-                ImmutableBytesWritable k = kv.getFirst();
-                ImmutableBytesWritable v = kv.getSecond();
-                decodeKey(k);
-                if (curShard != lastShard || curSliceTimestamp != lastSliceTimestamp) {
-                    makeNext();
-                }
-                consumeCurrent(v);
-            }
-            if (next == null) {
-                makeNext();
-            }
-        }
-        private void decodeKey(ImmutableBytesWritable k) {
-            byte[] buf = k.get();
-            int i = k.getOffset();
-            curShard = (short) BytesUtil.readUnsigned(buf, i, SHARD_LEN);
-            i += SHARD_LEN;
-            curSliceTimestamp = BytesUtil.readLong(buf, i, TIMEPART_LEN);
-            i += TIMEPART_LEN;
-            curCol = BytesUtil.readUnsigned(buf, i, COLNO_LEN);
-            i += COLNO_LEN;
-            if (i - k.getOffset() < k.getLength()) {
-                //bitmap
-                int colLen = info.length(curCol);
-                curColValue = BytesUtil.readUnsigned(buf, i, colLen);
-                i += colLen;
-            } else {
-                //value list
-                curColValue = -1;
-            }
-        }
-        private void consumeCurrent(ImmutableBytesWritable v) {
-            if (curCol != lastCol && bitMapValues.size() > 0) { // end of a bitmap container
-                addBitMapContainer(lastCol);
-            }
-            if (curColValue < 0) {
-                CompressedValueContainer c = new CompressedValueContainer(info, curCol, 0);
-                c.fromBytes(v);
-                addContainer(curCol, c);
-            } else {
-                assert curColValue == bitMapValues.size();
-                // make a copy, the value object from caller is typically reused through iteration
-                bitMapValues.add(new ImmutableBytesWritable(v));
-            }
-            lastShard = curShard;
-            lastSliceTimestamp = curSliceTimestamp;
-            lastCol = curCol;
-        }
-        private void makeNext() {
-            if (bitMapValues.isEmpty() == false) {
-                addBitMapContainer(lastCol);
-            }
-            if (containers != null) {
-                next = new Slice(info, lastShard, lastSliceTimestamp, containers);
-            }
-            lastSliceTimestamp = Long.MIN_VALUE;
-            lastCol = -1;
-            containers = null;
-            bitMapValues.clear();
-        }
-        private void addBitMapContainer(int col) {
-            BitMapContainer c = new BitMapContainer(info, col);
-            c.fromBytes(bitMapValues);
-            addContainer(col, c);
-            bitMapValues.clear();
-        }
-        private void addContainer(int col, ColumnValueContainer c) {
-            if (containers == null) {
-                containers = new ColumnValueContainer[info.getColumnCount()];
-            }
-            containers[col] = c;
-        }
-        @Override
-        public Iterator<Slice> iterator() {
-            return new Iterator<Slice>() {
-                @Override
-                public boolean hasNext() {
-                    goToNext();
-                    return next != null;
-                }
-                @Override
-                public Slice next() {
-                    Slice result = next;
-                    next = null;
-                    return result;
-                }
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
-        }
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.cube.invertedindex;
-import java.util.Iterator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Pair;
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.HadoopUtil;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
- * @author yangli9
- * 
- */
-public class InvertedIndexCLI {
-    public static void main(String[] args) throws IOException {
-        Configuration hconf = HadoopUtil.getDefaultConfiguration();
-        CubeManager mgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        String cubeName = args[0];
-        CubeInstance cube = mgr.getCube(cubeName);
-        String path = args[1];
-        System.out.println("Reading from " + path + " ...");
-        TableRecordInfo info = new TableRecordInfo(cube.getFirstSegment());
-        IIKeyValueCodec codec = new IIKeyValueCodec(info);
-        int count = 0;
-        for (Slice slice : codec.decodeKeyValue(readSequenceKVs(hconf, path))) {
-            for (TableRecordBytes rec : slice) {
-                System.out.println((TableRecord)rec);
-                count++;
-            }
-        }
-        System.out.println("Total " + count + " records");
-    }
-    public static Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> readSequenceKVs(Configuration hconf, String path) throws IOException {
-        final Reader reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
-        return new Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
-            @Override
-            public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
-                return new Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
-                    ImmutableBytesWritable k = new ImmutableBytesWritable();
-                    ImmutableBytesWritable v = new ImmutableBytesWritable();
-                    Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(k, v);
-                    @Override
-                    public boolean hasNext() {
-                        boolean hasNext = false;
-                        try {
-                            hasNext =, v);
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        } finally {
-                            if (hasNext == false) {
-                                IOUtils.closeQuietly(reader);
-                            }
-                        }
-                        return hasNext;
-                    }
-                    @Override
-                    public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
-                        return pair;
-                    }
-                    @Override
-                    public void remove() {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-            }
-        };
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
-package com.kylinolap.cube.invertedindex;
-public class ShardingHash {
-    static HashFunction hashFunc = Hashing.murmur3_128();
-    public static long hashInt(int integer) {
-        return hashFunc.newHasher().putInt(integer).hash().asLong();
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
-package com.kylinolap.cube.invertedindex;
-import java.util.List;
-public class ShardingSliceBuilder {
-    SliceBuilder[] builders;
-    public ShardingSliceBuilder(TableRecordInfo info) {
-        int sharding = info.getDescriptor().getSharding();
-        builders = new SliceBuilder[sharding];
-        for (short i = 0; i < sharding; i++) {
-            builders[i] = new SliceBuilder(info, i);
-        }
-    }
-    // NOTE: record must be appended in time order
-    public Slice append(TableRecord rec) {
-        short shard = rec.getShard();
-        return builders[shard].append(rec);
-    }
-    public List<Slice> close() {
-        List<Slice> result = Lists.newArrayList();
-        for (SliceBuilder builder : builders) {
-            Slice slice = builder.close();
-            if (slice != null)
-                result.add(slice);
-        }
-        return result;
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.cube.invertedindex;
-import java.util.Iterator;
- * Within a partition (per timestampGranularity), records are further sliced
- * (per sliceLength) to fit into HBASE cell.
- *
- * @author yangli9
- */
-public class Slice implements Iterable<TableRecordBytes>, Comparable<Slice> {
-    TableRecordInfoDigest info;
-    int nColumns;
-    short shard;
-    long timestamp;
-    int nRecords;
-    ColumnValueContainer[] containers;
-    Slice(TableRecordInfoDigest info, short shard, long timestamp, ColumnValueContainer[] containers) {
- = info;
-        this.nColumns = info.getColumnCount();
-        this.shard = shard;
-        this.timestamp = timestamp;
-        this.nRecords = containers[0].getSize();
-        this.containers = containers;
-        assert nColumns == containers.length;
-        for (int i = 0; i < nColumns; i++) {
-            assert nRecords == containers[i].getSize();
-        }
-    }
-    public short getShard() {
-        return shard;
-    }
-    public long getTimestamp() {
-        return timestamp;
-    }
-    /**
-     * Standard iterator of Slice will return a iterator of TableRecordBytes,
-     * which cannot be printed/formated to readable text.
-     * By invoking this API client can avoid code like:
-     * <p/>
-     * for (TableRecordBytes rec : slice) {
-     * TableRecord realRecord = (TableRecord) rec.clone();
-     * }
-     * <p/>
-     * Note this API cannot be called simultaneously with iterator()
-     *
-     * @return
-     */
-    public Iterator<TableRecord> readableIterator() {
-        final Iterator<TableRecordBytes> innerIterator = iterator();
-        return new Iterator<TableRecord>() {
-            @Override
-            public boolean hasNext() {
-                return innerIterator.hasNext();
-            }
-            @Override
-            public TableRecord next() {
-                return (TableRecord);
-            }
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-    @Override
-    public Iterator<TableRecordBytes> iterator() {
-        return new Iterator<TableRecordBytes>() {
-            int i = 0;
-            TableRecord rec = new TableRecord(info);
-            ImmutableBytesWritable temp = new ImmutableBytesWritable();
-            @Override
-            public boolean hasNext() {
-                return i < nRecords;
-            }
-            @Override
-            public TableRecordBytes next() {
-                for (int col = 0; col < nColumns; col++) {
-                    containers[col].getValueAt(i, temp);
-                    rec.setValueBytes(col, temp);
-                }
-                i++;
-                return rec;
-            }
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-    /* (non-Javadoc)
-     * @see java.lang.Object#hashCode()
-     */
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((info == null) ? 0 : info.hashCode());
-        result = prime * result + shard;
-        result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
-        return result;
-    }
-    /* (non-Javadoc)
-     * @see java.lang.Object#equals(java.lang.Object)
-     */
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        Slice other = (Slice) obj;
-        if (info == null) {
-            if ( != null)
-                return false;
-        } else if (!info.equals(
-            return false;
-        if (shard != other.shard)
-            return false;
-        if (timestamp != other.timestamp)
-            return false;
-        return true;
-    }
-    @Override
-    public int compareTo(Slice o) {
-        int comp = this.shard - o.shard;
-        if (comp != 0)
-            return comp;
-        comp = (int) (this.timestamp - o.timestamp);
-        return comp;
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.cube.invertedindex;
- * @author yangli9
- * 
- */
-public class SliceBuilder {
-    TableRecordInfo info;
-    private int nColumns;
-    int nRecordsCap;
-    short shard;
-    long sliceTimestamp;
-    int nRecords;
-    private ColumnValueContainer[] containers;
-    transient ImmutableBytesWritable temp = new ImmutableBytesWritable();
-    public SliceBuilder(TableRecordInfo info, short shard) {
- = info;
-        this.nColumns = info.getColumnCount();
-        this.nRecordsCap = Math.max(1, info.getDescriptor().getSliceSize());
-        this.shard = shard;
-        this.sliceTimestamp = Long.MIN_VALUE;
-        this.nRecords = 0;
-        this.containers = null;
-        doneSlice(); // init containers
-    }
-    private Slice doneSlice() {
-        Slice r = null;
-        if (nRecords > 0) {
-            for (int i = 0; i < nColumns; i++) {
-                containers[i].closeForChange();
-            }
-            r = new Slice(info, shard, sliceTimestamp, containers);
-        }
-        // reset for next slice
-        nRecords = 0;
-        containers = new ColumnValueContainer[nColumns];
-        for (int i : info.getDescriptor().getBitmapColumns()) {
-            containers[i] = new BitMapContainer(info, i);
-        }
-        for (int i : info.getDescriptor().getValueColumns()) {
-            containers[i] = new CompressedValueContainer(info, i, nRecordsCap);
-        }
-        for (int i : info.getDescriptor().getMetricsColumns()) {
-            containers[i] = new CompressedValueContainer(info, i, nRecordsCap);
-        }
-        return r;
-    }
-    // NOTE: record must be appended in time order
-    public Slice append(TableRecord rec) {
-        if (rec.getShard() != shard)
-            throw new IllegalStateException();
-        Slice doneSlice = null;
-        if (isFull()) {
-            doneSlice = doneSlice();
-        }
-        if (nRecords == 0) {
-            sliceTimestamp = increaseSliceTimestamp(rec.getTimestamp());
-        }
-        nRecords++;
-        for (int i = 0; i < nColumns; i++) {
-            rec.getValueBytes(i, temp);
-            containers[i].append(temp);
-        }
-        return doneSlice;
-    }
-    private long increaseSliceTimestamp(long timestamp) {
-        if (timestamp < sliceTimestamp)
-            throw new IllegalStateException();
-        if (timestamp == sliceTimestamp)
-            return ++timestamp; // ensure slice timestamp increases
-        else
-            return timestamp;
-    }
-    public Slice close() {
-        Slice doneSlice = doneSlice();
-        this.sliceTimestamp = Long.MIN_VALUE;
-        this.nRecords = 0;
-        return doneSlice;
-    }
-    private boolean isFull() {
-        return nRecords >= nRecordsCap;
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.cube.invertedindex;
-import com.kylinolap.dict.DateStrDictionary;
-import org.apache.hadoop.hbase.util.Bytes;
- * @author yangli9
- */
-public class TableRecord extends TableRecordBytes {
-    public TableRecord(TableRecordInfoDigest info) {
-        super();
-        if (info instanceof TableRecordInfo) {
-        } else {
-            throw new IllegalStateException("Table Record must be initialized with a TableRecordInfo");
-        }
- = info;
-        this.buf = new byte[info.byteFormLen];
-        reset();
-    }
-    public TableRecord(TableRecord another) {
-        super();
- =;
-        this.buf = Bytes.copy(another.buf);
-    }
-    @Override
-    public Object clone() {
-        return new TableRecord(this);
-    }
-    public long getTimestamp() {
-        String str = getValueString(info().getTimestampColumn());
-        return DateStrDictionary.stringToMillis(str);
-    }
-    public int length(int col) {
-        return info.length(col);
-    }
-    public void setValueString(int col, String value) {
-        if (info().isMetrics(col)) {
-            LongWritable v = info().codec(col).valueOf(value);
-            setValueMetrics(col, v);
-        } else {
-            int id = info().dict(col).getIdFromValue(value);
-            setValueID(col, id);
-        }
-    }
-    public String getValueString(int col) {
-        if (info().isMetrics(col))
-            return info().codec(col).toString(getValueMetrics(col));
-        else
-            return info().dict(col).getValueFromId(getValueID(col));
-    }
-    public void setValueBytes(int col, ImmutableBytesWritable bytes) {
-        System.arraycopy(bytes.get(), bytes.getOffset(), buf, info.offset(col), info.length(col));
-    }
-    public void getValueBytes(int col, ImmutableBytesWritable bytes) {
-        bytes.set(buf, info.offset(col), info.length(col));
-    }
-    private void setValueMetrics(int col, LongWritable value) {
-        info().codec(col).write(value, buf, info.offset(col));
-    }
-    private LongWritable getValueMetrics(int col) {
-        return info().codec(col).read(buf, info.offset(col));
-    }
-    public short getShard() {
-        int timestampID = getValueID(info().getTimestampColumn());
-        return (short) (Math.abs(ShardingHash.hashInt(timestampID)) % info().getDescriptor().getSharding());
-    }
-    public TableRecordInfo info() {
-        return (TableRecordInfo) info;
-    }
-    @Override
-    public String toString() {
-        StringBuilder buf = new StringBuilder("[");
-        for (int col = 0; col < info.getColumnCount(); col++) {
-            if (col > 0)
-                buf.append(",");
-            buf.append(getValueString(col));
-        }
-        buf.append("]");
-        return buf.toString();
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
-package com.kylinolap.cube.invertedindex;
-import com.kylinolap.common.util.BytesUtil;
-import com.kylinolap.dict.Dictionary;
-import org.apache.hadoop.hbase.util.Bytes;
-import java.util.Arrays;
- * Created by honma on 11/10/14.
- */
-public class TableRecordBytes implements Cloneable {
-    TableRecordInfoDigest info;
-    byte[] buf; // consecutive column value IDs (encoded by dictionary)
-    public TableRecordBytes() {
-    }
-    public void setValueID(int col, int id) {
-        BytesUtil.writeUnsigned(id, buf, info.offset(col), info.length(col));
-    }
-    public int getValueID(int col) {
-        return BytesUtil.readUnsigned(buf, info.offset(col), info.length(col));
-    }
-    public byte[] getBytes() {
-        return buf;
-    }
-    public void setBytes(byte[] bytes, int offset, int length) {
-        assert buf.length == length;
-        System.arraycopy(bytes, offset, buf, 0, length);
-    }
-    public void reset() {
-        Arrays.fill(buf, Dictionary.NULL);
-    }
-    public TableRecordBytes(TableRecordInfoDigest info) {
- = info;
-        this.buf = new byte[info.byteFormLen];
-        reset();
-    }
-    public TableRecordBytes(TableRecordBytes another) {
- =;
-        this.buf = Bytes.copy(another.buf);
-    }
-    @Override
-    public Object clone() {
-        return new TableRecordBytes(this);
-    }
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + Arrays.hashCode(buf);
-        result = prime * result + ((info == null) ? 0 : info.hashCode());
-        return result;
-    }
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        TableRecord other = (TableRecord) obj;
-        if (!Arrays.equals(buf, other.buf))
-            return false;
-        if (info == null) {
-            if ( != null)
-                return false;
-        } else if (!info.equals(
-            return false;
-        return true;
-    }
diff --git a/cube/src/main/java/com/kylinolap/cube/invertedindex/ b/cube/src/main/java/com/kylinolap/cube/invertedindex/
--- a/cube/src/main/java/com/kylinolap/cube/invertedindex/
-package com.kylinolap.cube.invertedindex;
- * Created by honma on 11/10/14.
- */
-public interface TableRecordFactory {
-    public TableRecordBytes createTableRecord();