You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/17 11:45:17 UTC

[8/9] incubator-kylin git commit: KYLIN-1010 Decompose project job

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
new file mode 100644
index 0000000..44ba8f4
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dataGen;
+
+import java.util.ArrayList;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class ColumnConfig {
+    @JsonProperty("columnName")
+    private String columnName;
+    @JsonProperty("valueSet")
+    private ArrayList<String> valueSet;
+    @JsonProperty("exclusive")
+    private boolean exclusive;
+    @JsonProperty("asRange")
+    private boolean asRange;
+
+    public boolean isAsRange() {
+        return asRange;
+    }
+
+    public void setAsRange(boolean asRange) {
+        this.asRange = asRange;
+    }
+
+    public boolean isExclusive() {
+        return exclusive;
+    }
+
+    public void setExclusive(boolean exclusive) {
+        this.exclusive = exclusive;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    public void setColumnName(String columnName) {
+        this.columnName = columnName;
+    }
+
+    public ArrayList<String> getValueSet() {
+        return valueSet;
+    }
+
+    public void setValueSet(ArrayList<String> valueSet) {
+        this.valueSet = valueSet;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
new file mode 100644
index 0000000..a965753
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dataGen;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ */
+public class FactTableGenerator {
+    CubeInstance cube = null;
+    CubeDesc desc = null;
+    ResourceStore store = null;
+    String factTableName = null;
+
+    GenConfig genConf = null;
+
+    Random r = null;
+
+    String cubeName;
+    long randomSeed;
+    int rowCount;
+    int unlinkableRowCount;
+    int unlinkableRowCountMax;
+    double conflictRatio;
+    double linkableRatio;
+
+    // the names of lookup table columns which is in relation with fact
+    // table(appear as fk in fact table)
+    TreeMap<String, LinkedList<String>> lookupTableKeys = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+    // possible values of lookupTableKeys, extracted from existing lookup
+    // tables.
+    // The key is in the format of tablename/columnname
+    TreeMap<String, ArrayList<String>> feasibleValues = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+    // lookup table name -> sets of all composite keys
+    TreeMap<String, HashSet<Array<String>>> lookupTableCompositeKeyValues = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+    private void init(String cubeName, int rowCount, double conflictRaio, double linkableRatio, long randomSeed) {
+        this.rowCount = rowCount;
+        this.conflictRatio = conflictRaio;
+        this.cubeName = cubeName;
+        this.randomSeed = randomSeed;
+        this.linkableRatio = linkableRatio;
+
+        this.unlinkableRowCountMax = (int) (this.rowCount * (1 - linkableRatio));
+        this.unlinkableRowCount = 0;
+
+        r = new Random(randomSeed);
+
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        desc = cube.getDescriptor();
+        factTableName = desc.getFactTable();
+        store = ResourceStore.getStore(config);
+    }
+
+    /*
+     * users can specify the value preference for each column
+     */
+    private void loadConfig() {
+        try {
+            InputStream configStream = null;
+            configStream = store.getResource("/data/data_gen_config.json");
+            this.genConf = GenConfig.loadConfig(configStream);
+
+            if (configStream != null)
+                configStream.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void loadLookupTableValues(String lookupTableName, LinkedList<String> columnNames, int distinctRowCount) throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+        // only deal with composite keys
+        if (columnNames.size() > 1 && !lookupTableCompositeKeyValues.containsKey(lookupTableName)) {
+            lookupTableCompositeKeyValues.put(lookupTableName, new HashSet<Array<String>>());
+        }
+
+        InputStream tableStream = null;
+        BufferedReader tableReader = null;
+        try {
+            TreeMap<String, Integer> zeroBasedInice = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+            for (String columnName : columnNames) {
+                ColumnDesc cDesc = MetadataManager.getInstance(config).getTableDesc(lookupTableName).findColumnByName(columnName);
+                zeroBasedInice.put(columnName, cDesc.getZeroBasedIndex());
+            }
+
+            String path = "/data/" + lookupTableName + ".csv";
+            tableStream = store.getResource(path);
+            tableReader = new BufferedReader(new InputStreamReader(tableStream));
+            tableReader.mark(0);
+            int rowCount = 0;
+            int curRowNum = 0;
+            String curRow;
+
+            while (tableReader.readLine() != null)
+                rowCount++;
+
+            HashSet<Integer> rows = new HashSet<Integer>();
+            distinctRowCount = (distinctRowCount < rowCount) ? distinctRowCount : rowCount;
+            while (rows.size() < distinctRowCount) {
+                rows.add(r.nextInt(rowCount));
+            }
+
+            // reopen the stream
+            tableReader.close();
+            tableStream.close();
+            tableStream = null;
+            tableReader = null;
+
+            tableStream = store.getResource(path);
+            tableReader = new BufferedReader(new InputStreamReader(tableStream));
+
+            while ((curRow = tableReader.readLine()) != null) {
+                if (rows.contains(curRowNum)) {
+                    String[] tokens = curRow.split(",");
+
+                    String[] comboKeys = null;
+                    int index = 0;
+                    if (columnNames.size() > 1)
+                        comboKeys = new String[columnNames.size()];
+
+                    for (String columnName : columnNames) {
+                        int zeroBasedIndex = zeroBasedInice.get(columnName);
+                        if (!feasibleValues.containsKey(lookupTableName + "/" + columnName))
+                            feasibleValues.put(lookupTableName + "/" + columnName, new ArrayList<String>());
+                        feasibleValues.get(lookupTableName + "/" + columnName).add(tokens[zeroBasedIndex]);
+
+                        if (columnNames.size() > 1) {
+                            comboKeys[index] = tokens[zeroBasedIndex];
+                            index++;
+                        }
+                    }
+
+                    if (columnNames.size() > 1) {
+                        Array<String> wrap = new Array<String>(comboKeys);
+                        if (lookupTableCompositeKeyValues.get(lookupTableName).contains(wrap)) {
+                            throw new Exception("The composite key already exist in the lookup table");
+                        }
+                        lookupTableCompositeKeyValues.get(lookupTableName).add(wrap);
+                    }
+                }
+                curRowNum++;
+            }
+
+            if (tableStream != null)
+                tableStream.close();
+            if (tableReader != null)
+                tableReader.close();
+
+        } catch (IOException e) {
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+
+    // prepare the candidate values for each joined column
+    private void prepare() throws Exception {
+        // load config
+        loadConfig();
+
+        TreeSet<String> factTableColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+
+        for (DimensionDesc dim : desc.getDimensions()) {
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (col.getTable().equals(factTableName))
+                    factTableColumns.add(col.getName());
+            }
+
+            JoinDesc join = dim.getJoin();
+            if (join != null) {
+                String lookupTable = dim.getTable();
+                for (String column : join.getPrimaryKey()) {
+                    if (!lookupTableKeys.containsKey(lookupTable)) {
+                        lookupTableKeys.put(lookupTable, new LinkedList<String>());
+                    }
+
+                    if (!lookupTableKeys.get(lookupTable).contains(column))
+                        lookupTableKeys.get(lookupTable).add(column);
+                }
+            }
+        }
+
+        int distinctRowCount = (int) (this.rowCount / this.conflictRatio);
+        distinctRowCount = (distinctRowCount == 0) ? 1 : distinctRowCount;
+        // lookup tables
+        for (String lookupTable : lookupTableKeys.keySet()) {
+            this.loadLookupTableValues(lookupTable, lookupTableKeys.get(lookupTable), distinctRowCount);
+        }
+    }
+
+    private List<DimensionDesc> getSortedDimentsionDescs() {
+        List<DimensionDesc> dimensions = desc.getDimensions();
+        Collections.sort(dimensions, new Comparator<DimensionDesc>() {
+            @Override
+            public int compare(DimensionDesc o1, DimensionDesc o2) {
+                JoinDesc j1 = o2.getJoin();
+                JoinDesc j2 = o1.getJoin();
+                return Integer.valueOf(j1 != null ? j1.getPrimaryKey().length : 0).compareTo(j2 != null ? j2.getPrimaryKey().length : 0);
+            }
+        });
+        return dimensions;
+    }
+
+    /**
+     * Generate the fact table and return it as text
+     *
+     * @return
+     * @throws Exception
+     */
+    private String cookData() throws Exception {
+        // the columns on the fact table can be classified into three groups:
+        // 1. foreign keys
+        TreeMap<String, String> factTableCol2LookupCol = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        // 2. metrics or directly used dimensions
+        TreeSet<String> usedCols = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+        // 3. others, not referenced anywhere
+
+        TreeMap<String, String> lookupCol2factTableCol = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+        // find fact table columns in fks
+        List<DimensionDesc> dimensions = getSortedDimentsionDescs();
+        for (DimensionDesc dim : dimensions) {
+            JoinDesc jDesc = dim.getJoin();
+            if (jDesc != null) {
+                String[] fks = jDesc.getForeignKey();
+                String[] pks = jDesc.getPrimaryKey();
+                int num = fks.length;
+                for (int i = 0; i < num; ++i) {
+                    String value = dim.getTable() + "/" + pks[i];
+
+                    lookupCol2factTableCol.put(value, fks[i]);
+
+                    if (factTableCol2LookupCol.containsKey(fks[i])) {
+                        if (!factTableCol2LookupCol.get(fks[i]).equals(value)) {
+                            System.out.println("Warning: Disambiguation on the mapping of column " + fks[i] + ", " + factTableCol2LookupCol.get(fks[i]) + "(chosen) or " + value);
+                            continue;
+                        }
+                    }
+                    factTableCol2LookupCol.put(fks[i], value);
+                }
+            }
+            //else, deal with it in next roung
+        }
+
+        // find fact table columns in direct dimension
+        // DO NOT merge this with the previous loop
+        for (DimensionDesc dim : dimensions) {
+            JoinDesc jDesc = dim.getJoin();
+            if (jDesc == null) {
+                // column on fact table used directly as a dimension
+                for (String aColumn : dim.getColumn()) {
+                    if (!factTableCol2LookupCol.containsKey(aColumn))
+                        usedCols.add(aColumn);
+                }
+            }
+        }
+
+        // find fact table columns in measures
+        for (MeasureDesc mDesc : desc.getMeasures()) {
+            List<TblColRef> pcols = mDesc.getFunction().getParameter().getColRefs();
+            if (pcols != null) {
+                for (TblColRef col : pcols) {
+                    if (!factTableCol2LookupCol.containsKey(col.getName()))
+                        usedCols.add(col.getName());
+                }
+            }
+        }
+
+        return createTable(this.rowCount, factTableCol2LookupCol, lookupCol2factTableCol, usedCols);
+    }
+
+    private String normToTwoDigits(int v) {
+        if (v < 10)
+            return "0" + v;
+        else
+            return Integer.toString(v);
+    }
+
+    private String randomPick(ArrayList<String> candidates) {
+        int index = r.nextInt(candidates.size());
+        return candidates.get(index);
+    }
+
+    private String createRandomCell(ColumnDesc cDesc, ArrayList<String> range) throws Exception {
+        DataType type = cDesc.getType();
+        if (type.isStringFamily()) {
+            throw new Exception("Can't handle range values for string");
+
+        } else if (type.isIntegerFamily()) {
+            int low = Integer.parseInt(range.get(0));
+            int high = Integer.parseInt(range.get(1));
+            return Integer.toString(r.nextInt(high - low) + low);
+
+        } else if (type.isDouble()) {
+            double low = Double.parseDouble(range.get(0));
+            double high = Double.parseDouble(range.get(1));
+            return String.format("%.4f", r.nextDouble() * (high - low) + low);
+
+        } else if (type.isFloat()) {
+            float low = Float.parseFloat(range.get(0));
+            float high = Float.parseFloat(range.get(1));
+            return String.format("%.4f", r.nextFloat() * (high - low) + low);
+
+        } else if (type.isDecimal()) {
+            double low = Double.parseDouble(range.get(0));
+            double high = Double.parseDouble(range.get(1));
+            return String.format("%.4f", r.nextDouble() * (high - low) + low);
+
+        } else if (type.isDateTimeFamily()) {
+            if (!type.isDate()) {
+                throw new RuntimeException("Does not support " + type);
+            }
+
+            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+            Date start = format.parse(range.get(0));
+            Date end = format.parse(range.get(1));
+            long diff = end.getTime() - start.getTime();
+            Date temp = new Date(start.getTime() + (long) (diff * r.nextDouble()));
+            Calendar cal = Calendar.getInstance();
+            cal.setTime(temp);
+            // first day
+            cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
+
+            return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
+        } else {
+            System.out.println("The data type " + type + "is not recognized");
+            System.exit(1);
+        }
+        return null;
+    }
+
+    private String createRandomCell(ColumnDesc cDesc) {
+        String type = cDesc.getTypeName();
+        String s = type.toLowerCase();
+        if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < 2; i++) {
+                sb.append((char) ('a' + r.nextInt(10)));// there are 10*10
+                // possible strings
+            }
+            return sb.toString();
+        } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
+            return Integer.toString(r.nextInt(128));
+        } else if (s.equals("double")) {
+            return String.format("%.4f", r.nextDouble() * 100);
+        } else if (s.equals("float")) {
+            return String.format("%.4f", r.nextFloat() * 100);
+        } else if (s.equals("decimal")) {
+            return String.format("%.4f", r.nextDouble() * 100);
+        } else if (s.equals("date")) {
+            long date20131231 = 61349312153265L;
+            long date20010101 = 60939158400000L;
+            long diff = date20131231 - date20010101;
+            Date temp = new Date(date20010101 + (long) (diff * r.nextDouble()));
+            Calendar cal = Calendar.getInstance();
+            cal.setTime(temp);
+            // first day
+            cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
+
+            return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
+        } else {
+            System.out.println("The data type " + type + "is not recognized");
+            System.exit(1);
+        }
+        return null;
+    }
+
+    private String createDefaultsCell(String type) {
+        String s = type.toLowerCase();
+        if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
+            return "abcde";
+        } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
+            return "0";
+        } else if (s.equals("double")) {
+            return "0";
+        } else if (s.equals("float")) {
+            return "0";
+        } else if (s.equals("decimal")) {
+            return "0";
+        } else if (s.equals("date")) {
+            return "1970-01-01";
+        } else {
+            System.out.println("The data type " + type + "is not recognized");
+            System.exit(1);
+        }
+        return null;
+    }
+
+    private void printColumnMappings(TreeMap<String, String> factTableCol2LookupCol, TreeSet<String> usedCols, TreeSet<String> defaultColumns) {
+
+        System.out.println("=======================================================================");
+        System.out.format("%-30s %s", "FACT_TABLE_COLUMN", "MAPPING");
+        System.out.println();
+        System.out.println();
+        for (Map.Entry<String, String> entry : factTableCol2LookupCol.entrySet()) {
+            System.out.format("%-30s %s", entry.getKey(), entry.getValue());
+            System.out.println();
+        }
+        for (String key : usedCols) {
+            System.out.format("%-30s %s", key, "Random Values");
+            System.out.println();
+        }
+        for (String key : defaultColumns) {
+            System.out.format("%-30s %s", key, "Default Values");
+            System.out.println();
+        }
+        System.out.println("=======================================================================");
+
+        System.out.println("Parameters:");
+        System.out.println();
+        System.out.println("CubeName:        " + cubeName);
+        System.out.println("RowCount:        " + rowCount);
+        System.out.println("ConflictRatio:   " + conflictRatio);
+        System.out.println("LinkableRatio:   " + linkableRatio);
+        System.out.println("Seed:            " + randomSeed);
+        System.out.println();
+        System.out.println("The number of actual unlinkable fact rows is: " + this.unlinkableRowCount);
+        System.out.println("You can vary the above parameters to generate different datasets.");
+        System.out.println();
+    }
+
+    // Any row in the column must finally appear in the flatten big table.
+    // for single-column joins the generated row is guaranteed to have a match
+    // in lookup table
+    // for composite keys we'll need an extra check
+    private boolean matchAllCompositeKeys(TreeMap<String, String> lookupCol2FactTableCol, LinkedList<String> columnValues) {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+        for (String lookupTable : lookupTableKeys.keySet()) {
+            if (lookupTableKeys.get(lookupTable).size() == 1)
+                continue;
+
+            String[] comboKey = new String[lookupTableKeys.get(lookupTable).size()];
+            int index = 0;
+            for (String column : lookupTableKeys.get(lookupTable)) {
+                String key = lookupTable + "/" + column;
+                String factTableCol = lookupCol2FactTableCol.get(key);
+                int cardinal = MetadataManager.getInstance(config).getTableDesc(factTableName).findColumnByName(factTableCol).getZeroBasedIndex();
+                comboKey[index] = columnValues.get(cardinal);
+
+                index++;
+            }
+            Array<String> wrap = new Array<String>(comboKey);
+            if (!lookupTableCompositeKeyValues.get(lookupTable).contains(wrap)) {
+                // System.out.println("Try " + wrap + " Failed, continue...");
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private String createCell(ColumnDesc cDesc) throws Exception {
+        ColumnConfig cConfig = null;
+
+        if ((cConfig = genConf.getColumnConfigByName(cDesc.getName())) == null) {
+            // if the column is not configured, use random values
+            return (createRandomCell(cDesc));
+
+        } else {
+            // the column has a configuration
+            if (!cConfig.isAsRange() && !cConfig.isExclusive() && r.nextBoolean()) {
+                // if the column still allows random values
+                return (createRandomCell(cDesc));
+
+            } else {
+                // use specified values
+                ArrayList<String> valueSet = cConfig.getValueSet();
+                if (valueSet == null || valueSet.size() == 0)
+                    throw new Exception("Did you forget to specify value set for " + cDesc.getName());
+
+                if (!cConfig.isAsRange()) {
+                    return (randomPick(valueSet));
+                } else {
+                    if (valueSet.size() != 2)
+                        throw new Exception("Only two values can be set for range values, the column: " + cDesc.getName());
+
+                    return (createRandomCell(cDesc, valueSet));
+                }
+            }
+
+        }
+    }
+
+    private LinkedList<String> createRow(TreeMap<String, String> factTableCol2LookupCol, TreeSet<String> usedCols, TreeSet<String> defaultColumns) throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        LinkedList<String> columnValues = new LinkedList<String>();
+
+        for (ColumnDesc cDesc : MetadataManager.getInstance(config).getTableDesc(factTableName).getColumns()) {
+
+            String colName = cDesc.getName();
+
+            if (factTableCol2LookupCol.containsKey(colName)) {
+
+                // if the current column is a fk column in fact table
+                ArrayList<String> candidates = this.feasibleValues.get(factTableCol2LookupCol.get(colName));
+
+                columnValues.add(candidates.get(r.nextInt(candidates.size())));
+            } else if (usedCols.contains(colName)) {
+
+                // if the current column is a metric column in fact table
+                columnValues.add(createCell(cDesc));
+            } else {
+
+                // otherwise this column is not useful in OLAP
+                columnValues.add(createDefaultsCell(cDesc.getTypeName()));
+                defaultColumns.add(colName);
+            }
+        }
+
+        return columnValues;
+    }
+
+    /**
+     * return the text of table contents(one line one row)
+     *
+     * @param rowCount
+     * @param factTableCol2LookupCol
+     * @param lookupCol2FactTableCol
+     * @param usedCols
+     * @return
+     * @throws Exception
+     */
+    private String createTable(int rowCount, TreeMap<String, String> factTableCol2LookupCol, TreeMap<String, String> lookupCol2FactTableCol, TreeSet<String> usedCols) throws Exception {
+        try {
+            TreeSet<String> defaultColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+
+            StringBuffer sb = new StringBuffer();
+            for (int i = 0; i < rowCount;) {
+
+                LinkedList<String> columnValues = createRow(factTableCol2LookupCol, usedCols, defaultColumns);
+
+                if (!matchAllCompositeKeys(lookupCol2FactTableCol, columnValues)) {
+                    if (unlinkableRowCount < unlinkableRowCountMax) {
+                        unlinkableRowCount++;
+                    } else {
+                        continue;
+                    }
+                }
+
+                for (String c : columnValues)
+                    sb.append(c + ",");
+                sb.deleteCharAt(sb.length() - 1);
+                sb.append(System.getProperty("line.separator"));
+
+                i++;
+
+                // System.out.println("Just generated the " + i + "th record");
+            }
+
+            printColumnMappings(factTableCol2LookupCol, usedCols, defaultColumns);
+
+            return sb.toString();
+
+        } catch (IOException e) {
+            e.printStackTrace();
+            System.exit(1);
+        }
+
+        return null;
+    }
+
+    /**
+     * Randomly create a fact table and return the table content
+     *
+     * @param cubeName      name of the cube
+     * @param rowCount      expected row count generated
+     * @param linkableRatio the percentage of fact table rows that can be linked with all
+     *                      lookup table by INNER join
+     * @param randomSeed    random seed
+     */
+    public static String generate(String cubeName, String rowCount, String linkableRatio, String randomSeed) throws Exception {
+
+        if (rowCount == null)
+            rowCount = "10000";
+        if (linkableRatio == null)
+            linkableRatio = "0.6";
+
+        //if (randomSeed == null)
+        // don't give it value
+
+        // String conflictRatio = "5";//this parameter do not allow configuring
+        // any more
+
+        FactTableGenerator generator = new FactTableGenerator();
+        long seed;
+        if (randomSeed != null) {
+            seed = Long.parseLong(randomSeed);
+        } else {
+            Random r = new Random();
+            seed = r.nextLong();
+        }
+
+        generator.init(cubeName, Integer.parseInt(rowCount), 5, Double.parseDouble(linkableRatio), seed);
+        generator.prepare();
+        return generator.cookData();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
new file mode 100644
index 0000000..c58cfb6
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.dataGen;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.kylin.common.util.JsonUtil;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class GenConfig {
+
+    @JsonProperty("columnConfigs")
+    private ArrayList<ColumnConfig> columnConfigs;
+
+    private HashMap<String, ColumnConfig> cache = new HashMap<String, ColumnConfig>();
+
+    public ArrayList<ColumnConfig> getColumnConfigs() {
+        return columnConfigs;
+    }
+
+    public void setColumnConfigs(ArrayList<ColumnConfig> columnConfigs) {
+        this.columnConfigs = columnConfigs;
+    }
+
+    public ColumnConfig getColumnConfigByName(String columnName) {
+        columnName = columnName.toLowerCase();
+
+        if (cache.containsKey(columnName))
+            return cache.get(columnName);
+
+        for (ColumnConfig cConfig : columnConfigs) {
+            if (cConfig.getColumnName().toLowerCase().equals(columnName)) {
+                cache.put(columnName, cConfig);
+                return cConfig;
+            }
+        }
+        cache.put(columnName, null);
+        return null;
+    }
+
+    public static GenConfig loadConfig(InputStream stream) {
+        try {
+            GenConfig config = JsonUtil.readValue(stream, GenConfig.class);
+            return config;
+        } catch (JsonMappingException e) {
+            e.printStackTrace();
+        } catch (JsonParseException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
new file mode 100644
index 0000000..dcd460b
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -0,0 +1,240 @@
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.common.util.FIFOIterable;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.invertedindex.model.KeyValueCodec;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
+import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.ClearTextDictionary;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregators;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.ParsedStreamMessage;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.StreamParser;
+import org.apache.kylin.streaming.StringStreamParser;
+import org.apache.kylin.streaming.invertedindex.SliceBuilder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ */
+public class IITest extends LocalFileMetadataTestCase {
+
+    String iiName = "test_kylin_ii_inner_join";
+    IIInstance ii;
+    IIDesc iiDesc;
+    String cubeName = "test_kylin_cube_with_slr_empty";
+
+    List<IIRow> iiRows;
+
+    final String[] inputData = new String[] { //
+    "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
+            "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
+            "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+        this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
+        this.iiDesc = ii.getDescriptor();
+
+        List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
+            @Nullable
+            @Override
+            public StreamMessage apply(String input) {
+                return new StreamMessage(System.currentTimeMillis(), input.getBytes());
+            }
+        });
+
+        List<List<String>> parsedStreamMessages = Lists.newArrayList();
+        StreamParser parser = StringStreamParser.instance;
+
+        MicroStreamBatch batch = new MicroStreamBatch(0);
+        for (StreamMessage message : streamMessages) {
+            ParsedStreamMessage parsedStreamMessage = parser.parse(message);
+            if ((parsedStreamMessage.isAccepted())) {
+                batch.add(parsedStreamMessage);
+            }
+        }
+
+        iiRows = Lists.newArrayList();
+        final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
+        IIKeyValueCodec codec = new IIKeyValueCodec(slice.getInfo());
+        for (IIRow iiRow : codec.encodeKeyValue(slice)) {
+            iiRows.add(iiRow);
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    /**
+     * simulate stream building into slices, and encode the slice into IIRows.
+     * Then reconstruct the IIRows to slice.
+     */
+    @Test
+    public void basicTest() {
+        Queue<IIRow> buffer = Lists.newLinkedList();
+        FIFOIterable bufferIterable = new FIFOIterable(buffer);
+        TableRecordInfo info = new TableRecordInfo(iiDesc);
+        TableRecordInfoDigest digest = info.getDigest();
+        KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
+        Iterator<Slice> slices = codec.decodeKeyValue(bufferIterable).iterator();
+
+        Assert.assertTrue(!slices.hasNext());
+        Assert.assertEquals(iiRows.size(), digest.getColumnCount());
+
+        for (int i = 0; i < digest.getColumnCount(); ++i) {
+            buffer.add(iiRows.get(i));
+
+            if (i != digest.getColumnCount() - 1) {
+                Assert.assertTrue(!slices.hasNext());
+            } else {
+                Assert.assertTrue(slices.hasNext());
+            }
+        }
+
+        Slice newSlice = slices.next();
+        Assert.assertEquals(newSlice.getLocalDictionaries()[0].getSize(), 2);
+    }
+
+    @Test
+    public void IIEndpointTest() {
+        TableRecordInfo info = new TableRecordInfo(ii.getDescriptor());
+        CoprocessorRowType type = CoprocessorRowType.fromTableRecordInfo(info, ii.getFirstSegment().getColumns());
+        CoprocessorProjector projector = CoprocessorProjector.makeForEndpoint(info, Collections.singletonList(ii.getDescriptor().findColumnRef("default.test_kylin_fact", "lstg_format_name")));
+
+        FunctionDesc f1 = new FunctionDesc();
+        f1.setExpression("SUM");
+        ParameterDesc p1 = new ParameterDesc();
+        p1.setType("column");
+        p1.setValue("PRICE");
+        f1.setParameter(p1);
+        f1.setReturnType("decimal(19,4)");
+
+        TblColRef column = ii.getDescriptor().findColumnRef("default.test_kylin_fact", "cal_dt");
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GTE);
+        ColumnTupleFilter columnFilter = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter);
+        ConstantTupleFilter constantFilter = null;
+        constantFilter = new ConstantTupleFilter(("2012-08-16"));
+        compareFilter.addChild(constantFilter);
+
+        EndpointAggregators aggregators = EndpointAggregators.fromFunctions(info, Collections.singletonList(f1));
+        CoprocessorFilter filter = CoprocessorFilter.fromFilter(new ClearTextDictionary(info), compareFilter, FilterDecorator.FilterConstantsTreatment.AS_IT_IS);
+
+        final Iterator<IIRow> iiRowIterator = iiRows.iterator();
+
+        IIEndpoint endpoint = new IIEndpoint();
+        IIProtos.IIResponseInternal response = endpoint.getResponse(new RegionScanner() {
+            @Override
+            public HRegionInfo getRegionInfo() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean isFilterDone() throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean reseek(byte[] row) throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public long getMaxResultSize() {
+                throw new NotImplementedException();
+
+            }
+
+            @Override
+            public long getMvccReadPoint() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean nextRaw(List<Cell> result) throws IOException {
+                if (iiRowIterator.hasNext()) {
+                    IIRow iiRow = iiRowIterator.next();
+                    result.addAll(iiRow.makeCells());
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            @Override
+            public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean next(List<Cell> results) throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean next(List<Cell> result, int limit) throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public void close() throws IOException {
+                throw new NotImplementedException();
+            }
+        }, type, projector, aggregators, filter);
+
+        Assert.assertEquals(2, response.getRowsList().size());
+        System.out.println(response.getRowsList().size());
+        Set<String> answers = Sets.newHashSet("120.4747", "26.8551");
+        for (IIProtos.IIResponseInternal.IIRow responseRow : response.getRowsList()) {
+            byte[] measuresBytes = responseRow.getMeasures().toByteArray();
+            List<Object> metrics = aggregators.deserializeMetricValues(measuresBytes, 0);
+            Assert.assertTrue(answers.contains(metrics.get(0)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
new file mode 100644
index 0000000..be4fa26
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -0,0 +1,90 @@
+package org.apache.kylin.job.streaming;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.apache.kylin.streaming.StreamBuilder;
+import org.apache.kylin.streaming.StreamMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+@Ignore
+public class CubeStreamConsumerTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumerTest.class);
+
+    private KylinConfig kylinConfig;
+
+    private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+        // remove all existing segments
+        CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
+
+    }
+
+    @Test
+    public void test() throws Exception {
+        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
+        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
+        queues.add(queue);
+        StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
+        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
+        loadDataFromLocalFile(queue, 100000);
+        future.get();
+    }
+
+    private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
+        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+        String line;
+        int count = 0;
+        while ((line = br.readLine()) != null && count++ < maxCount) {
+            final List<String> strings = Arrays.asList(line.split("\t"));
+            queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
+        }
+        queue.put(StreamMessage.EOF);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
new file mode 100644
index 0000000..dc6d312
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
@@ -0,0 +1,144 @@
+package org.apache.kylin.job.streaming;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.MicroStreamBatchConsumer;
+import org.apache.kylin.streaming.ParsedStreamMessage;
+import org.apache.kylin.streaming.StreamBuilder;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.StreamParser;
+import org.apache.kylin.streaming.StreamingManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(PeriodicalStreamBuilderTest.class);
+
+    @Before
+    public void setup() {
+        this.createTestMetadata();
+
+    }
+
+    @After
+    public void clear() {
+        this.cleanupTestMetadata();
+    }
+
+    private List<StreamMessage> prepareTestData(long start, long end, int count) {
+        double step = (double) (end - start) / (count - 1);
+        long ts = start;
+        int offset = 0;
+        ArrayList<StreamMessage> result = Lists.newArrayList();
+        for (int i = 0; i < count - 1; ++i) {
+            result.add(new StreamMessage(offset++, String.valueOf(ts).getBytes()));
+            ts += step;
+        }
+        result.add(new StreamMessage(offset++, String.valueOf(end).getBytes()));
+        assertEquals(count, result.size());
+        assertEquals(start + "", new String(result.get(0).getRawData()));
+        assertEquals(end + "", new String(result.get(count - 1).getRawData()));
+        return result;
+    }
+
+    @Test
+    public void test() throws ExecutionException, InterruptedException {
+
+        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
+        queues.add(new LinkedBlockingQueue<StreamMessage>());
+        queues.add(new LinkedBlockingQueue<StreamMessage>());
+
+        final long interval = 3000L;
+        final long nextPeriodStart = TimeUtil.getNextPeriodStart(System.currentTimeMillis(), interval);
+
+        final List<Integer> partitionIds = Lists.newArrayList();
+        for (int i = 0; i < queues.size(); i++) {
+            partitionIds.add(i);
+        }
+
+        final MicroStreamBatchConsumer consumer = new MicroStreamBatchConsumer() {
+            @Override
+            public void consume(MicroStreamBatch microStreamBatch) throws Exception {
+                logger.info("consuming batch:" + microStreamBatch.getPartitionId() + " count:" + microStreamBatch.size() + " timestamp:" + microStreamBatch.getTimestamp() + " offset:" + microStreamBatch.getOffset());
+            }
+
+            @Override
+            public void stop() {
+                logger.info("consumer stopped");
+            }
+        };
+        final StreamBuilder streamBuilder = StreamBuilder.newPeriodicalStreamBuilder("test", queues, consumer, nextPeriodStart, interval);
+
+        streamBuilder.setStreamParser(new StreamParser() {
+            @Override
+            public ParsedStreamMessage parse(StreamMessage streamMessage) {
+                return new ParsedStreamMessage(Collections.<String> emptyList(), streamMessage.getOffset(), Long.parseLong(new String(streamMessage.getRawData())), true);
+            }
+        });
+
+        Future<?> future = Executors.newSingleThreadExecutor().submit(streamBuilder);
+        long timeout = nextPeriodStart + interval;
+        int messageCount = 0;
+        int inPeriodMessageCount = 0;
+        int expectedOffset = 0;
+        logger.info("prepare to add StreamMessage");
+        while (true) {
+            long ts = System.currentTimeMillis();
+            if (ts >= timeout + interval) {
+                break;
+            }
+            if (ts >= nextPeriodStart && ts < timeout) {
+                inPeriodMessageCount++;
+            }
+            for (BlockingQueue<StreamMessage> queue : queues) {
+                queue.put(new StreamMessage(messageCount, String.valueOf(ts).getBytes()));
+            }
+            if (expectedOffset == 0 && ts >= timeout) {
+                expectedOffset = messageCount - 1;
+            }
+            messageCount++;
+            Thread.sleep(10);
+        }
+        logger.info("totally put " + messageCount + " StreamMessages");
+        logger.info("totally in period " + inPeriodMessageCount + " StreamMessages");
+
+        for (BlockingQueue<StreamMessage> queue : queues) {
+            queue.put(StreamMessage.EOF);
+        }
+
+        future.get();
+
+        for (BlockingQueue<StreamMessage> queue : queues) {
+            queue.take();
+        }
+
+        final Map<Integer, Long> offsets = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getOffset("test", partitionIds);
+        logger.info("offset:" + offsets);
+        for (Long offset : offsets.values()) {
+            assertEquals(expectedOffset, offset.longValue());
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
new file mode 100644
index 0000000..075a048
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
@@ -0,0 +1,76 @@
+package org.apache.kylin.job.streaming;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.TreeMultiset;
+
+/**
+ * this is for generating fact table data for test_streaming_table (cube streaming)
+ */
+public class StreamingTableDataGenerator {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingTableDataGenerator.class);
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    public static List<String> generate(int recordCount, long startTime, long endTime, String tableName) {
+        Preconditions.checkArgument(startTime < endTime);
+        Preconditions.checkArgument(recordCount > 0);
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
+
+        SortedMultiset<Long> times = TreeMultiset.create();
+        Random r = new Random();
+        for (int i = 0; i < recordCount; i++) {
+            long t = startTime + (long) ((endTime - startTime) * r.nextDouble());
+            times.add(t);
+        }
+
+        List<String> ret = Lists.newArrayList();
+        HashMap<String, String> kvs = Maps.newHashMap();
+        for (long time : times) {
+            kvs.clear();
+            kvs.put("timestamp", String.valueOf(time));
+            for (ColumnDesc columnDesc : tableDesc.getColumns()) {
+                String lowerCaseColumnName = columnDesc.getName().toLowerCase();
+                DataType dataType = columnDesc.getType();
+                if (dataType.isDateTimeFamily()) {
+                    //TimedJsonStreamParser will derived minute_start,hour_start,day_start from timestamp
+                    continue;
+                } else if (dataType.isStringFamily()) {
+                    char c = (char) ('A' + (int) (26 * r.nextDouble()));
+                    kvs.put(lowerCaseColumnName, String.valueOf(c));
+                } else if (dataType.isIntegerFamily()) {
+                    int v = r.nextInt(10000);
+                    kvs.put(lowerCaseColumnName, String.valueOf(v));
+                } else if (dataType.isNumberFamily()) {
+                    String v = String.format("%.4f", r.nextDouble() * 100);
+                    kvs.put(lowerCaseColumnName, v);
+                }
+            }
+            try {
+                ret.add(mapper.writeValueAsString(kvs));
+            } catch (JsonProcessingException e) {
+                logger.error("error!", e);
+            }
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
new file mode 100644
index 0000000..8218d51
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
+
+    @Before
+    public void setup() throws Exception {
+        super.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        super.cleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws IOException {
+        if (!useSandbox())
+            return;
+
+        KylinConfig config = getTestConfig();
+        String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
+        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
+
+        assertTrue(loaded.size() == toLoad.length);
+        for (String str : toLoad)
+            assertTrue(loaded.contains(str));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
new file mode 100644
index 0000000..57c0be3
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This test case need the hive runtime; Please run it with sandbox;
+ * @author shaoshi
+ *
+ * It is in the exclude list of default profile in pom.xml
+ */
+public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
+
+    @Test
+    public void test() throws IOException {
+        HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact");
+        int rowNumber = 0;
+        while (reader.next()) {
+            String[] row = reader.getRow();
+            Assert.assertEquals(9, row.length);
+            //System.out.println(ArrayUtils.toString(row));
+            rowNumber++;
+        }
+
+        reader.close();
+        Assert.assertEquals(10000, rowNumber);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
new file mode 100644
index 0000000..0df632a
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
+
+    SnapshotManager snapshotMgr;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        snapshotMgr = SnapshotManager.getInstance(getTestConfig());
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void basicTest() throws Exception {
+        String tableName = "EDW.TEST_SITES";
+        TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
+        ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+        String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
+
+        snapshotMgr.wipeoutCache();
+
+        SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
+
+        // compare hive & snapshot
+        TableReader hiveReader = hiveTable.getReader();
+        TableReader snapshotReader = snapshot.getReader();
+
+        while (true) {
+            boolean hiveNext = hiveReader.next();
+            boolean snapshotNext = snapshotReader.next();
+            assertEquals(hiveNext, snapshotNext);
+
+            if (hiveNext == false)
+                break;
+
+            String[] hiveRow = hiveReader.getRow();
+            String[] snapshotRow = snapshotReader.getRow();
+            assertArrayEquals(hiveRow, snapshotRow);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/build/script/prepare_libs.sh
----------------------------------------------------------------------
diff --git a/build/script/prepare_libs.sh b/build/script/prepare_libs.sh
index a80d4f4..2012b3f 100755
--- a/build/script/prepare_libs.sh
+++ b/build/script/prepare_libs.sh
@@ -13,7 +13,7 @@ echo "version ${version}"
 echo "copy lib file"
 rm -rf build/lib
 mkdir build/lib
-cp job/target/kylin-job-${version}-job.jar build/lib/kylin-job-${version}.jar
+cp assembly/target/kylin-job-${version}-job.jar build/lib/kylin-job-${version}.jar
 cp storage-hbase/target/kylin-storage-hbase-${version}-coprocessor.jar build/lib/kylin-coprocessor-${version}.jar
 cp jdbc/target/kylin-jdbc-${version}.jar build/lib/kylin-jdbc-${version}.jar
 # Copied file becomes 000 for some env (e.g. my Cygwin)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
index 001dfe5..e5a4540 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
@@ -37,6 +37,7 @@ import com.fasterxml.jackson.annotation.JsonManagedReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Lists;
 
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class JobInstance extends RootPersistentEntity implements Comparable<JobInstance> {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
index 9ebe335..1e34f39 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
 
 /**
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
 public class ExecutableOutputPO extends RootPersistentEntity {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
index 52df562..75717e0 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
 
 /**
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
 public class ExecutablePO extends RootPersistentEntity {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
new file mode 100644
index 0000000..ecac973
--- /dev/null
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.lock.MockJobLock;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ */
+public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase {
+
+    private DefaultScheduler scheduler;
+
+    protected ExecutableManager jobService;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        setFinalStatic(ExecutableConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10);
+        jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+        scheduler = DefaultScheduler.getInstance();
+        scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock());
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    static void setFinalStatic(Field field, Object newValue) throws Exception {
+        field.setAccessible(true);
+
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+        field.set(null, newValue);
+    }
+
+    protected void waitForJobFinish(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            final ExecutableState status = job.getStatus();
+            if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    protected void waitForJobStatus(String jobId, ExecutableState state, long interval) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (job.getStatus() == state) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(interval);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
new file mode 100644
index 0000000..d50baad
--- /dev/null
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.job.BaseTestExecutable;
+import org.apache.kylin.job.ErrorTestExecutable;
+import org.apache.kylin.job.FailedTestExecutable;
+import org.apache.kylin.job.SelfStopExecutable;
+import org.apache.kylin.job.SucceedTestExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.junit.Test;
+
+/**
+ */
+public class DefaultSchedulerTest extends BaseSchedulerTest {
+
+    @Test
+    public void testSingleTaskJob() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        job.addTask(task1);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+    }
+
+    @Test
+    public void testSucceed() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        BaseTestExecutable task2 = new SucceedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+    }
+
+    @Test
+    public void testSucceedAndFailed() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        BaseTestExecutable task2 = new FailedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
+    }
+
+    @Test
+    public void testSucceedAndError() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new ErrorTestExecutable();
+        BaseTestExecutable task2 = new SucceedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+        assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
+        assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
+    }
+
+    @Test
+    public void testDiscard() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SelfStopExecutable();
+        job.addTask(task1);
+        jobService.addJob(job);
+        waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
+        jobService.discardJob(job.getId());
+        waitForJobFinish(job.getId());
+        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
+        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
+        Thread.sleep(5000);
+        System.out.println(job);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testSchedulerPool() throws InterruptedException {
+        ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
+        final CountDownLatch countDownLatch = new CountDownLatch(3);
+        ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                countDownLatch.countDown();
+            }
+        }, 5, 5, TimeUnit.SECONDS);
+        assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
+        assertTrue("future should still running", future.cancel(true));
+
+        final CountDownLatch countDownLatch2 = new CountDownLatch(3);
+        ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                countDownLatch2.countDown();
+                throw new RuntimeException();
+            }
+        }, 5, 5, TimeUnit.SECONDS);
+        assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
+        assertFalse("future2 should has been stopped", future2.cancel(true));
+
+        final CountDownLatch countDownLatch3 = new CountDownLatch(3);
+        ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    countDownLatch3.countDown();
+                    throw new RuntimeException();
+                } catch (Exception e) {
+                }
+            }
+        }, 5, 5, TimeUnit.SECONDS);
+        assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
+        assertTrue("future3 should still running", future3.cancel(true));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/core-storage/pom.xml
----------------------------------------------------------------------
diff --git a/core-storage/pom.xml b/core-storage/pom.xml
index 4bb7695..e17d13f 100644
--- a/core-storage/pom.xml
+++ b/core-storage/pom.xml
@@ -42,12 +42,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-invertedindex</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
-
-        <dependency>
             <groupId>net.sf.ehcache</groupId>
             <artifactId>ehcache</artifactId>
             <version>2.8.1</version>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/engine-spark/pom.xml
----------------------------------------------------------------------
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 3aa01e3..d2b150e 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -86,14 +86,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-job</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
-            <version>${project.parent.version}</version>
-        </dependency>
-
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index dd87782..896aa80 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Maps;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.primitives.UnsignedBytes;
+
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -66,7 +67,7 @@ import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.storage.hbase.steps.HBaseConnection;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
 import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
 import org.apache.spark.SparkConf;
@@ -80,9 +81,11 @@ import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.HiveContext;
+
 import scala.Tuple2;
 
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;