You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/23 13:21:57 UTC

[20/23] incubator-kylin git commit: KYLIN-875 half way

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
new file mode 100644
index 0000000..c90c5d3
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -0,0 +1,381 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.gridtable.CubeCodeSystem;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.NumberDictionaryBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.ExtractTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class DictGridTableTest {
+
+    @Test
+    public void test() throws IOException {
+        GridTable table = newTestTable();
+        verifyScanRangePlanner(table);
+        verifyFirstRow(table);
+        verifyScanWithUnevaluatableFilter(table);
+        verifyScanWithEvaluatableFilter(table);
+        verifyConvertFilterConstants1(table);
+        verifyConvertFilterConstants2(table);
+        verifyConvertFilterConstants3(table);
+        verifyConvertFilterConstants4(table);
+    }
+
+    private void verifyScanRangePlanner(GridTable table) {
+        GTInfo info = table.getInfo();
+        GTScanRangePlanner planner = new GTScanRangePlanner(info);
+        
+        CompareTupleFilter timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        CompareTupleFilter timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
+        CompareTupleFilter timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
+        CompareTupleFilter timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
+        CompareTupleFilter ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
+        CompareTupleFilter ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
+        CompareTupleFilter ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
+        CompareTupleFilter ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+        
+        // flatten or-and & hbase fuzzy value
+        {
+            LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(1, r.size());
+            assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString());
+            assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString());
+        }
+        
+        // pre-evaluate ever false
+        {
+            LogicalTupleFilter filter = and(timeComp1, timeComp2);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(0, r.size());
+        }
+        
+        // pre-evaluate ever true
+        {
+            LogicalTupleFilter filter = or(timeComp1, ageComp4);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals("[[null, null]-[null, null]]", r.toString());
+        }
+        
+        // merge overlap range
+        {
+            LogicalTupleFilter filter = or(timeComp1, timeComp3);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals("[[null, null]-[null, null]]", r.toString());
+        }
+        
+        // merge too many ranges
+        {
+            LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3));
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(3, r.size());
+            assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
+            assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
+            assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
+            List<GTScanRange> r2 = planner.planScanRanges(filter, 2);
+            assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
+        }
+    }
+
+    private void verifyFirstRow(GridTable table) throws IOException {
+        doScanAndVerify(table, new GTScanRequest(table.getInfo()), "[1421193600000, 30, Yang, 10, 10.5]");
+    }
+
+    private void verifyScanWithUnevaluatableFilter(GridTable table) throws IOException {
+        GTInfo info = table.getInfo();
+
+        CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
+        LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
+        LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
+
+        GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+
+        // note the unEvaluatable column 1 in filter is added to group by
+        assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+        
+        doScanAndVerify(table, req, "[1421280000000, 20, null, 20, null]");
+    }
+    
+    private void verifyScanWithEvaluatableFilter(GridTable table) throws IOException {
+        GTInfo info = table.getInfo();
+
+        CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+        
+        // note the evaluatable column 1 in filter is added to returned columns but not in group by
+        assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+        
+        doScanAndVerify(table, req, "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]");
+    }
+
+    private void verifyConvertFilterConstants1(GridTable table) {
+        GTInfo info = table.getInfo();
+        
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+        TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+        
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString());
+    }
+
+    private void verifyConvertFilterConstants2(GridTable table) {
+        GTInfo info = table.getInfo();
+        
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+        TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+        
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LT, "9");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+        
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        // $1<"9" round up to $1<"10"
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString());
+    }
+    
+    private void verifyConvertFilterConstants3(GridTable table) {
+        GTInfo info = table.getInfo();
+        
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+        TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+        
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LTE, "9");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+        
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        // $1<="9" round down to FALSE
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString());
+    }
+    
+    private void verifyConvertFilterConstants4(GridTable table) {
+        GTInfo info = table.getInfo();
+        
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+        TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+        
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+        
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        // $1 in ("9", "10", "15") has only "10" left
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]", newFilter.toString());
+    }
+    
+    private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
+        System.out.println(req);
+        IGTScanner scanner = table.scan(req);
+        int i = 0;
+        for (GTRecord r : scanner) {
+            System.out.println(r);
+            if (verifyRows != null && i < verifyRows.length) {
+                assertEquals(verifyRows[i], r.toString());
+            }
+            i++;
+        }
+        scanner.close();
+    }
+
+    private Object enc(GTInfo info, int col, String value) {
+        ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
+        info.codeSystem.encodeColumnValue(col, value, buf);
+        return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
+    }
+
+    private ExtractTupleFilter unevaluatable(TblColRef col) {
+        ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
+        r.addChild(new ColumnTupleFilter(col));
+        return r;
+    }
+
+    private CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
+        CompareTupleFilter result = new CompareTupleFilter(op);
+        result.addChild(new ColumnTupleFilter(col));
+        result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
+        return result;
+    }
+
+    private LogicalTupleFilter and(TupleFilter... children) {
+        return logic(FilterOperatorEnum.AND, children);
+    }
+
+    private LogicalTupleFilter or(TupleFilter... children) {
+        return logic(FilterOperatorEnum.OR, children);
+    }
+
+    private LogicalTupleFilter not(TupleFilter child) {
+        return logic(FilterOperatorEnum.NOT, child);
+    }
+
+    private LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
+        LogicalTupleFilter result = new LogicalTupleFilter(op);
+        for (TupleFilter c : children) {
+            result.addChild(c);
+        }
+        return result;
+    }
+
+    static GridTable newTestTable() throws IOException {
+        GTInfo info = newInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTRecord r = new GTRecord(table.getInfo());
+        GTBuilder builder = table.rebuild();
+
+        builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5")));
+        builder.close();
+
+        return table;
+    }
+
+    static GTInfo newInfo() {
+        Builder builder = GTInfo.builder();
+        builder.setCodeSystem(newDictCodeSystem());
+        builder.setColumns( //
+                DataType.getInstance("timestamp"), //
+                DataType.getInstance("integer"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("bigint"), //
+                DataType.getInstance("decimal") //
+        );
+        builder.setPrimaryKey(setOf(0, 1));
+        builder.setColumnPreferIndex(setOf(0));
+        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
+        builder.enableRowBlock(4);
+        GTInfo info = builder.build();
+        return info;
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static CubeCodeSystem newDictCodeSystem() {
+        Map<Integer, Dictionary> dictionaryMap = Maps.newHashMap();
+        dictionaryMap.put(1, newDictionaryOfInteger());
+        dictionaryMap.put(2, newDictionaryOfString());
+        return new CubeCodeSystem(dictionaryMap);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static Dictionary newDictionaryOfString() {
+        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+        builder.addValue("Dong");
+        builder.addValue("George");
+        builder.addValue("Jason");
+        builder.addValue("Kejia");
+        builder.addValue("Luke");
+        builder.addValue("Mahone");
+        builder.addValue("Qianhao");
+        builder.addValue("Shaofeng");
+        builder.addValue("Xu");
+        builder.addValue("Yang");
+        return builder.build(0);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static Dictionary newDictionaryOfInteger() {
+        NumberDictionaryBuilder<String> builder = new NumberDictionaryBuilder<>(new StringBytesConverter());
+        builder.addValue("10");
+        builder.addValue("20");
+        builder.addValue("30");
+        builder.addValue("40");
+        builder.addValue("50");
+        builder.addValue("60");
+        builder.addValue("70");
+        builder.addValue("80");
+        builder.addValue("90");
+        builder.addValue("100");
+        return builder.build(0);
+    }
+
+    private static ImmutableBitSet setOf(int... values) {
+        BitSet set = new BitSet();
+        for (int i : values)
+            set.set(i);
+        return new ImmutableBitSet(set);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
new file mode 100644
index 0000000..c87e970
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -0,0 +1,195 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.junit.Test;
+
+public class SimpleGridTableTest {
+
+    @Test
+    public void testBasics() throws IOException {
+        GTInfo info = UnitTestSupport.basicInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTBuilder builder = rebuild(table);
+        IGTScanner scanner = scan(table);
+        assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+        assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+    }
+
+    @Test
+    public void testAdvanced() throws IOException {
+        GTInfo info = UnitTestSupport.advancedInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTBuilder builder = rebuild(table);
+        IGTScanner scanner = scan(table);
+        assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+        assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+    }
+
+    @Test
+    public void testAggregate() throws IOException {
+        GTInfo info = UnitTestSupport.advancedInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTBuilder builder = rebuild(table);
+        IGTScanner scanner = scanAndAggregate(table);
+        assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+        assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+    }
+
+    @Test
+    public void testAppend() throws IOException {
+        GTInfo info = UnitTestSupport.advancedInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        rebuildViaAppend(table);
+        IGTScanner scanner = scan(table);
+        assertEquals(3, scanner.getScannedRowBlockCount());
+        assertEquals(10, scanner.getScannedRowCount());
+    }
+
+    private IGTScanner scan(GridTable table) throws IOException {
+        GTScanRequest req = new GTScanRequest(table.getInfo());
+        IGTScanner scanner = table.scan(req);
+        for (GTRecord r : scanner) {
+            Object[] v = r.getValues();
+            assertTrue(((String) v[0]).startsWith("2015-"));
+            assertTrue(((String) v[2]).equals("Food"));
+            assertTrue(((LongMutable) v[3]).get() == 10);
+            assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+            System.out.println(r);
+        }
+        scanner.close();
+        System.out.println("Scanned Row Block Count: " + scanner.getScannedRowBlockCount());
+        System.out.println("Scanned Row Count: " + scanner.getScannedRowCount());
+        return scanner;
+    }
+
+    private IGTScanner scanAndAggregate(GridTable table) throws IOException {
+        GTScanRequest req = new GTScanRequest(table.getInfo(), null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
+        IGTScanner scanner = table.scan(req);
+        int i = 0;
+        for (GTRecord r : scanner) {
+            Object[] v = r.getValues();
+            switch (i) {
+            case 0:
+                assertTrue(((LongMutable) v[3]).get() == 20);
+                assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0);
+                break;
+            case 1:
+                assertTrue(((LongMutable) v[3]).get() == 30);
+                assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5);
+                break;
+            case 2:
+                assertTrue(((LongMutable) v[3]).get() == 40);
+                assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0);
+                break;
+            case 3:
+                assertTrue(((LongMutable) v[3]).get() == 10);
+                assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+                break;
+            default:
+                fail();
+            }
+            i++;
+            System.out.println(r);
+        }
+        scanner.close();
+        System.out.println("Scanned Row Block Count: " + scanner.getScannedRowBlockCount());
+        System.out.println("Scanned Row Count: " + scanner.getScannedRowCount());
+        return scanner;
+    }
+
+    static GTBuilder rebuild(GridTable table) throws IOException {
+        GTBuilder builder = table.rebuild();
+        for (GTRecord rec : UnitTestSupport.mockupData(table.getInfo(), 10)) {
+            builder.write(rec);
+        }
+        builder.close();
+
+        System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+        System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+        return builder;
+    }
+    
+    static void rebuildViaAppend(GridTable table) throws IOException {
+        List<GTRecord> data = UnitTestSupport.mockupData(table.getInfo(), 10);
+        GTBuilder builder;
+        int i = 0;
+
+        builder = table.append();
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.close();
+        System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+        System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+        builder = table.append();
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.close();
+        System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+        System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+        builder = table.append();
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.close();
+        System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+        System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+        builder = table.append();
+        builder.write(data.get(i++));
+        builder.close();
+        System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+        System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+    }
+
+    private static ImmutableBitSet setOf(int... values) {
+        BitSet set = new BitSet();
+        for (int i : values)
+            set.set(i);
+        return new ImmutableBitSet(set);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
new file mode 100644
index 0000000..f5c9645
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
@@ -0,0 +1,188 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTInvertedIndex;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTRowBlock;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.serializer.StringSerializer;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SimpleInvertedIndexTest {
+
+    GTInfo info;
+    GTInvertedIndex index;
+    ArrayList<CompareTupleFilter> basicFilters = Lists.newArrayList();
+    ArrayList<ConciseSet> basicResults = Lists.newArrayList();
+
+    public SimpleInvertedIndexTest() {
+        
+        info = UnitTestSupport.advancedInfo();
+        TblColRef colA = info.colRef(0);
+        
+        // block i contains value "i", the last is NULL
+        index = new GTInvertedIndex(info);
+        GTRowBlock mockBlock = GTRowBlock.allocate(info);
+        GTRowBlock.Writer writer = mockBlock.getWriter();
+        GTRecord record = new GTRecord(info);
+        for (int i = 0; i < 10; i++) {
+            record.setValues(i < 9 ? "" + i : null, "", "", new LongMutable(0), new BigDecimal(0));
+            for (int j = 0; j < info.getRowBlockSize(); j++) {
+                writer.append(record);
+            }
+            writer.readyForFlush();
+            index.add(mockBlock);
+            
+            writer.clearForNext();
+        }
+        
+        basicFilters.add(compare(colA, FilterOperatorEnum.ISNULL));
+        basicResults.add(set(9));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.ISNOTNULL));
+        basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.EQ, 0));
+        basicResults.add(set(0));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.NEQ, 0));
+        basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.IN, 0, 5));
+        basicResults.add(set(0, 5));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.NOTIN, 0, 5));
+        basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.LT, 3));
+        basicResults.add(set(0, 1, 2));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.LTE, 3));
+        basicResults.add(set(0, 1, 2, 3));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.GT, 3));
+        basicResults.add(set(4, 5, 6, 7, 8));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.GTE, 3));
+        basicResults.add(set(3, 4, 5, 6, 7, 8));
+    }
+
+    @Test
+    public void testBasics() {
+        for (int i = 0; i < basicFilters.size(); i++) {
+            assertEquals(basicResults.get(i), index.filter(basicFilters.get(i)));
+        }
+    }
+
+    @Test
+    public void testLogicalAnd() {
+        for (int i = 0; i < basicFilters.size(); i++) {
+            for (int j = 0; j < basicFilters.size(); j++) {
+                LogicalTupleFilter f = logical(FilterOperatorEnum.AND, basicFilters.get(i), basicFilters.get(j));
+                ConciseSet r = basicResults.get(i).clone();
+                r.retainAll(basicResults.get(j));
+                assertEquals(r, index.filter(f));
+            }
+        }
+    }
+
+    @Test
+    public void testLogicalOr() {
+        for (int i = 0; i < basicFilters.size(); i++) {
+            for (int j = 0; j < basicFilters.size(); j++) {
+                LogicalTupleFilter f = logical(FilterOperatorEnum.OR, basicFilters.get(i), basicFilters.get(j));
+                ConciseSet r = basicResults.get(i).clone();
+                r.addAll(basicResults.get(j));
+                assertEquals(r, index.filter(f));
+            }
+        }
+    }
+
+    @Test
+    public void testNotEvaluable() {
+        ConciseSet all = set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+        
+        CompareTupleFilter notEvaluable = compare(info.colRef(1), FilterOperatorEnum.EQ, 0);
+        assertEquals(all, index.filter(notEvaluable));
+
+        LogicalTupleFilter or = logical(FilterOperatorEnum.OR, basicFilters.get(0), notEvaluable);
+        assertEquals(all, index.filter(or));
+
+        LogicalTupleFilter and = logical(FilterOperatorEnum.AND, basicFilters.get(0), notEvaluable);
+        assertEquals(basicResults.get(0), index.filter(and));
+    }
+
+    public static CompareTupleFilter compare(TblColRef col, TupleFilter.FilterOperatorEnum op, int... ids) {
+        CompareTupleFilter filter = new CompareTupleFilter(op);
+        filter.addChild(columnFilter(col));
+        for (int i : ids) {
+            filter.addChild(constFilter(i));
+        }
+        return filter;
+    }
+
+    public static LogicalTupleFilter logical(TupleFilter.FilterOperatorEnum op, TupleFilter... filters) {
+        LogicalTupleFilter filter = new LogicalTupleFilter(op);
+        for (TupleFilter f : filters)
+            filter.addChild(f);
+        return filter;
+    }
+
+    public static ColumnTupleFilter columnFilter(TblColRef col) {
+        return new ColumnTupleFilter(col);
+    }
+
+    public static ConstantTupleFilter constFilter(int id) {
+        byte[] space = new byte[10];
+        ByteBuffer buf = ByteBuffer.wrap(space);
+        StringSerializer stringSerializer = new StringSerializer(DataType.getInstance("string"));
+        stringSerializer.serialize("" + id, buf);
+        ByteArray data = new ByteArray(buf.array(), buf.arrayOffset(), buf.position());
+        return new ConstantTupleFilter(data);
+    }
+
+    public static ConciseSet set(int... ints) {
+        ConciseSet set = new ConciseSet();
+        for (int i : ints)
+            set.add(i);
+        return set;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/pom.xml
----------------------------------------------------------------------
diff --git a/core-job/pom.xml b/core-job/pom.xml
index b51c1cf..95339f6 100644
--- a/core-job/pom.xml
+++ b/core-job/pom.xml
@@ -41,6 +41,11 @@
             <version>${project.parent.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+        </dependency>
+
         <!-- Env & Test -->
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
new file mode 100644
index 0000000..6b814ef
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class BuildEngineFactory {
+    
+    private static IBatchCubingEngine defaultBatchEngine;
+    
+    public static IBatchCubingEngine defaultBatchEngine() {
+        if (defaultBatchEngine == null) {
+            KylinConfig conf = KylinConfig.getInstanceFromEnv();
+            if (conf.isCubingInMem()) {
+                defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine2");
+            } else {
+                defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine");
+            }
+        }
+        return defaultBatchEngine;
+    }
+    
+    /** Build a new cube segment, typically its time range appends to the end of current cube. */
+    public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+        return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
+    }
+    
+    /** Merge multiple small segments into a big one. */
+    public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+        return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
new file mode 100644
index 0000000..904f557
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IBatchCubingEngine {
+
+    /** Build a new cube segment, typically its time range appends to the end of current cube. */
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);
+    
+    /** Merge multiple small segments into a big one. */
+    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
+    
+    public Class<?> getSourceInterface();
+    
+    public Class<?> getStorageInterface();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
new file mode 100644
index 0000000..0359ce9
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
@@ -0,0 +1,8 @@
+package org.apache.kylin.engine;
+
+import org.apache.kylin.cube.CubeSegment;
+
+public interface IStreamingCubingEngine {
+
+    public Runnable createStreamingCubingBuilder(CubeSegment seg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
new file mode 100644
index 0000000..e7f5772
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.constant.JobStepCmdTypeEnum;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.engine.JobEngineConfig;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class JobInstance extends RootPersistentEntity implements Comparable<JobInstance> {
+
+    public static final String JOB_WORKING_DIR_PREFIX = "kylin-";
+
+    public static final String YARN_APP_ID = "yarn_application_id";
+    public static final String YARN_APP_URL = "yarn_application_tracking_url";
+    public static final String MR_JOB_ID = "mr_job_id";
+    public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
+    public static final String SOURCE_RECORDS_COUNT = "source_records_count";
+    public static final String SOURCE_RECORDS_SIZE = "source_records_size";
+
+    public static String getStepIdentity(JobInstance jobInstance, JobStep jobStep) {
+        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid() + "." + jobStep.getSequenceID();
+    }
+
+    public static String getJobIdentity(JobInstance jobInstance) {
+        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid();
+    }
+
+    public static String getJobWorkingDir(JobInstance jobInstance, JobEngineConfig engineConfig) {
+        return getJobWorkingDir(jobInstance.getUuid(), engineConfig.getHdfsWorkingDirectory());
+    }
+
+    public static String getJobWorkingDir(String jobUuid, String hdfsWorkdingDir) {
+        if (jobUuid == null || jobUuid.equals("")) {
+            throw new IllegalArgumentException("jobUuid can't be null or empty");
+        }
+        return hdfsWorkdingDir + "/" + JOB_WORKING_DIR_PREFIX + jobUuid;
+    }
+
+    @JsonProperty("name")
+    private String name;
+
+    @JsonProperty("type")
+    private CubeBuildTypeEnum type; // java implementation
+    @JsonProperty("duration")
+    private long duration;
+    @JsonProperty("related_cube")
+    private String relatedCube;
+    @JsonProperty("related_segment")
+    private String relatedSegment;
+    @JsonProperty("exec_start_time")
+    private long execStartTime;
+    @JsonProperty("exec_end_time")
+    private long execEndTime;
+    @JsonProperty("mr_waiting")
+    private long mrWaiting = 0;
+    @JsonManagedReference
+    @JsonProperty("steps")
+    private List<JobStep> steps;
+    @JsonProperty("submitter")
+    private String submitter;
+    @JsonProperty("job_status")
+    private JobStatusEnum status;
+
+    public JobStep getRunningStep() {
+        for (JobStep step : this.getSteps()) {
+            if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || step.getStatus().equals(JobStepStatusEnum.WAITING)) {
+                return step;
+            }
+        }
+
+        return null;
+    }
+
+    @JsonProperty("progress")
+    public double getProgress() {
+        int completedStepCount = 0;
+        for (JobStep step : this.getSteps()) {
+            if (step.getStatus().equals(JobStepStatusEnum.FINISHED)) {
+                completedStepCount++;
+            }
+        }
+
+        return 100.0 * completedStepCount / steps.size();
+    }
+
+    public JobStatusEnum getStatus() {
+        return this.status;
+    }
+
+    public void setStatus(JobStatusEnum status) {
+        this.status = status;
+    }
+
+//    @JsonProperty("job_status")
+//    public JobStatusEnum getStatus() {
+//
+//        // JobStatusEnum finalJobStatus;
+//        int compositResult = 0;
+//
+//        // if steps status are all NEW, then job status is NEW
+//        // if steps status are all FINISHED, then job status is FINISHED
+//        // if steps status are all PENDING, then job status is PENDING
+//        // if steps status are FINISHED and PENDING, the job status is PENDING
+//        // if one of steps status is RUNNING, then job status is RUNNING
+//        // if one of steps status is ERROR, then job status is ERROR
+//        // if one of steps status is KILLED, then job status is KILLED
+//        // default status is RUNNING
+//
+//        System.out.println(this.getName());
+//
+//        for (JobStep step : this.getSteps()) {
+//            //System.out.println("step: " + step.getSequenceID() + "'s status:" + step.getStatus());
+//            compositResult = compositResult | step.getStatus().getCode();
+//        }
+//
+//        System.out.println();
+//
+//        if (compositResult == JobStatusEnum.FINISHED.getCode()) {
+//            return JobStatusEnum.FINISHED;
+//        } else if (compositResult == JobStatusEnum.NEW.getCode()) {
+//            return JobStatusEnum.NEW;
+//        } else if (compositResult == JobStatusEnum.PENDING.getCode()) {
+//            return JobStatusEnum.PENDING;
+//        } else if (compositResult == (JobStatusEnum.FINISHED.getCode() | JobStatusEnum.PENDING.getCode())) {
+//            return JobStatusEnum.PENDING;
+//        } else if ((compositResult & JobStatusEnum.ERROR.getCode()) == JobStatusEnum.ERROR.getCode()) {
+//            return JobStatusEnum.ERROR;
+//        } else if ((compositResult & JobStatusEnum.DISCARDED.getCode()) == JobStatusEnum.DISCARDED.getCode()) {
+//            return JobStatusEnum.DISCARDED;
+//        } else if ((compositResult & JobStatusEnum.RUNNING.getCode()) == JobStatusEnum.RUNNING.getCode()) {
+//            return JobStatusEnum.RUNNING;
+//        }
+//
+//        return JobStatusEnum.RUNNING;
+//    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public CubeBuildTypeEnum getType() {
+        return type;
+    }
+
+    public void setType(CubeBuildTypeEnum type) {
+        this.type = type;
+    }
+
+    public long getDuration() {
+        return duration;
+    }
+
+    public void setDuration(long duration) {
+        this.duration = duration;
+    }
+
+    public String getRelatedCube() {
+        return relatedCube;
+    }
+
+    public void setRelatedCube(String relatedCube) {
+        this.relatedCube = relatedCube;
+    }
+
+    public String getRelatedSegment() {
+        return relatedSegment;
+    }
+
+    public void setRelatedSegment(String relatedSegment) {
+        this.relatedSegment = relatedSegment;
+    }
+
+    /**
+     * @return the execStartTime
+     */
+    public long getExecStartTime() {
+        return execStartTime;
+    }
+
+    /**
+     * @param execStartTime the execStartTime to set
+     */
+    public void setExecStartTime(long execStartTime) {
+        this.execStartTime = execStartTime;
+    }
+
+    /**
+     * @return the execEndTime
+     */
+    public long getExecEndTime() {
+        return execEndTime;
+    }
+
+    /**
+     * @param execEndTime the execEndTime to set
+     */
+    public void setExecEndTime(long execEndTime) {
+        this.execEndTime = execEndTime;
+    }
+
+    public long getMrWaiting() {
+        return this.mrWaiting;
+    }
+
+    public void setMrWaiting(long mrWaiting) {
+        this.mrWaiting = mrWaiting;
+    }
+
+    public List<JobStep> getSteps() {
+        if (steps == null) {
+            steps = Lists.newArrayList();
+        }
+        return steps;
+    }
+
+    public void clearSteps() {
+        getSteps().clear();
+    }
+
+    public void addSteps(Collection<JobStep> steps) {
+        this.getSteps().addAll(steps);
+    }
+
+    public void addStep(JobStep step) {
+        getSteps().add(step);
+    }
+
+    public void addStep(int index, JobStep step) {
+        getSteps().add(index, step);
+    }
+
+    public JobStep findStep(String stepName) {
+        for (JobStep step : getSteps()) {
+            if (stepName.equals(step.getName())) {
+                return step;
+            }
+        }
+        return null;
+    }
+
+        
+    public String getSubmitter() {
+        return submitter;
+    }
+
+    public void setSubmitter(String submitter) {
+        this.submitter = submitter;
+    }
+
+
+
+
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    public static class JobStep implements Comparable<JobStep> {
+
+        @JsonBackReference
+        private JobInstance jobInstance;
+        
+        @JsonProperty("id")
+        private String id;
+
+        @JsonProperty("name")
+        private String name;
+
+        @JsonProperty("sequence_id")
+        private int sequenceID;
+
+        @JsonProperty("exec_cmd")
+        private String execCmd;
+
+        @JsonProperty("interrupt_cmd")
+        private String InterruptCmd;
+
+        @JsonProperty("exec_start_time")
+        private long execStartTime;
+        @JsonProperty("exec_end_time")
+        private long execEndTime;
+        @JsonProperty("exec_wait_time")
+        private long execWaitTime;
+
+        @JsonProperty("step_status")
+        private JobStepStatusEnum status;
+
+        @JsonProperty("cmd_type")
+        private JobStepCmdTypeEnum cmdType = JobStepCmdTypeEnum.SHELL_CMD_HADOOP;
+
+        @JsonProperty("info")
+        private ConcurrentHashMap<String, String> info = new ConcurrentHashMap<String, String>();
+
+        @JsonProperty("run_async")
+        private boolean runAsync = false;
+
+        private ConcurrentHashMap<String, String> getInfo() {
+            return info;
+        }
+
+        public void putInfo(String key, String value) {
+            getInfo().put(key, value);
+        }
+
+        public String getInfo(String key) {
+            return getInfo().get(key);
+        }
+
+        public void clearInfo() {
+            getInfo().clear();
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public int getSequenceID() {
+            return sequenceID;
+        }
+
+        public void setSequenceID(int sequenceID) {
+            this.sequenceID = sequenceID;
+        }
+
+        public String getExecCmd() {
+            return execCmd;
+        }
+
+        public void setExecCmd(String execCmd) {
+            this.execCmd = execCmd;
+        }
+
+        public JobStepStatusEnum getStatus() {
+            return status;
+        }
+
+        public void setStatus(JobStepStatusEnum status) {
+            this.status = status;
+        }
+        
+        
+
+        public String getId() {
+            return id;
+        }
+
+        public void setId(String id) {
+            this.id = id;
+        }
+
+        /**
+         * @return the execStartTime
+         */
+        public long getExecStartTime() {
+            return execStartTime;
+        }
+
+        /**
+         * @param execStartTime the execStartTime to set
+         */
+        public void setExecStartTime(long execStartTime) {
+            this.execStartTime = execStartTime;
+        }
+
+        /**
+         * @return the execEndTime
+         */
+        public long getExecEndTime() {
+            return execEndTime;
+        }
+
+        /**
+         * @param execEndTime the execEndTime to set
+         */
+        public void setExecEndTime(long execEndTime) {
+            this.execEndTime = execEndTime;
+        }
+
+        public long getExecWaitTime() {
+            return execWaitTime;
+        }
+
+        public void setExecWaitTime(long execWaitTime) {
+            this.execWaitTime = execWaitTime;
+        }
+
+        public String getInterruptCmd() {
+            return InterruptCmd;
+        }
+
+        public void setInterruptCmd(String interruptCmd) {
+            InterruptCmd = interruptCmd;
+        }
+
+        public JobStepCmdTypeEnum getCmdType() {
+            return cmdType;
+        }
+
+        public void setCmdType(JobStepCmdTypeEnum cmdType) {
+            this.cmdType = cmdType;
+        }
+
+        /**
+         * @return the runAsync
+         */
+        public boolean isRunAsync() {
+            return runAsync;
+        }
+
+        /**
+         * @param runAsync the runAsync to set
+         */
+        public void setRunAsync(boolean runAsync) {
+            this.runAsync = runAsync;
+        }
+
+        /**
+         * @return the jobInstance
+         */
+        public JobInstance getJobInstance() {
+            return jobInstance;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((name == null) ? 0 : name.hashCode());
+            result = prime * result + sequenceID;
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            JobStep other = (JobStep) obj;
+            if (name == null) {
+                if (other.name != null)
+                    return false;
+            } else if (!name.equals(other.name))
+                return false;
+            if (sequenceID != other.sequenceID)
+                return false;
+            return true;
+        }
+
+        @Override
+        public int compareTo(JobStep o) {
+            if (this.sequenceID < o.sequenceID) {
+                return -1;
+            } else if (this.sequenceID > o.sequenceID) {
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+    }
+
+    @Override
+    public int compareTo(JobInstance o) {
+        return o.lastModified<this.lastModified?-1:o.lastModified>this.lastModified?1:0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
new file mode 100644
index 0000000..932da38
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.LookupDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+
+public class JoinedFlatTable {
+
+    public static final String FACT_TABLE_ALIAS = "FACT_TABLE";
+
+    public static final String LOOKUP_TABLE_ALAIS_PREFIX = "LOOKUP_";
+
+    public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+        return storageDfsDir + "/" + intermediateTableDesc.getTableName();
+    }
+
+    public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+        StringBuilder ddl = new StringBuilder();
+
+        ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName() + "\n");
+
+        ddl.append("(" + "\n");
+        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
+            IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+            if (i > 0) {
+                ddl.append(",");
+            }
+            ddl.append(colName(col.getCanonicalName()) + " " + getHiveDataType(col.getDataType()) + "\n");
+        }
+        ddl.append(")" + "\n");
+
+        ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
+        ddl.append("STORED AS SEQUENCEFILE" + "\n");
+        ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
+        // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
+        // ";\n");
+        return ddl.toString();
+    }
+
+    public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+        StringBuilder ddl = new StringBuilder();
+        ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName() + ";").append("\n");
+        return ddl.toString();
+    }
+
+    public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) throws IOException {
+        StringBuilder sql = new StringBuilder();
+
+        File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath());
+
+        if (hadoopPropertiesFile.exists()) {
+            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+            DocumentBuilder builder;
+            Document doc;
+            try {
+                builder = factory.newDocumentBuilder();
+                doc = builder.parse(hadoopPropertiesFile);
+                NodeList nl = doc.getElementsByTagName("property");
+                for (int i = 0; i < nl.getLength(); i++) {
+                    String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
+                    String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
+                    if (name.equals("tmpjars") == false) {
+                        sql.append("SET " + name + "=" + value + ";").append("\n");
+                    }
+                }
+
+            } catch (ParserConfigurationException e) {
+                throw new IOException(e);
+            } catch (SAXException e) {
+                throw new IOException(e);
+            }
+        }
+
+        sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
+
+        return sql.toString();
+    }
+
+    public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+        StringBuilder sql = new StringBuilder();
+        sql.append("SELECT" + "\n");
+        String tableAlias;
+        Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel());
+        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
+            IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+            if (i > 0) {
+                sql.append(",");
+            }
+            tableAlias = tableAliasMap.get(col.getTableName());
+            sql.append(tableAlias + "." + col.getColumnName() + "\n");
+        }
+        appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
+        appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+        return sql.toString();
+    }
+
+    private static Map<String, String> buildTableAliasMap(DataModelDesc dataModelDesc) {
+        Map<String, String> tableAliasMap = new HashMap<String, String>();
+
+        tableAliasMap.put(dataModelDesc.getFactTable().toUpperCase(), FACT_TABLE_ALIAS);
+
+        int i = 1;
+        for (LookupDesc lookupDesc: dataModelDesc.getLookups()) {
+            JoinDesc join = lookupDesc.getJoin();
+            if (join != null) {
+                tableAliasMap.put(lookupDesc.getTable().toUpperCase(), LOOKUP_TABLE_ALAIS_PREFIX + i);
+                i++;
+            }
+
+        }
+        return tableAliasMap;
+    }
+
+    private static void appendJoinStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+        Set<String> dimTableCache = new HashSet<String>();
+
+        DataModelDesc dataModelDesc = intermediateTableDesc.getDataModel();
+        String factTableName = dataModelDesc.getFactTable();
+        String factTableAlias = tableAliasMap.get(factTableName);
+        sql.append("FROM " + factTableName + " as " + factTableAlias + " \n");
+
+        for (LookupDesc lookupDesc : dataModelDesc.getLookups()) {
+            JoinDesc join = lookupDesc.getJoin();
+            if (join != null && join.getType().equals("") == false) {
+                String joinType = join.getType().toUpperCase();
+                String dimTableName = lookupDesc.getTable();
+                if (!dimTableCache.contains(dimTableName)) {
+                    TblColRef[] pk = join.getPrimaryKeyColumns();
+                    TblColRef[] fk = join.getForeignKeyColumns();
+                    if (pk.length != fk.length) {
+                        throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
+                    }
+                    sql.append(joinType + " JOIN " + dimTableName + " as " + tableAliasMap.get(dimTableName) + "\n");
+                    sql.append("ON ");
+                    for (int i = 0; i < pk.length; i++) {
+                        if (i > 0) {
+                            sql.append(" AND ");
+                        }
+                        sql.append(factTableAlias + "." + fk[i].getName() + " = " + tableAliasMap.get(dimTableName) + "." + pk[i].getName());
+                    }
+                    sql.append("\n");
+
+                    dimTableCache.add(dimTableName);
+                }
+            }
+        }
+    }
+
+    private static void appendWhereStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+        if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
+            return;//TODO: for now only cube segments support filter and partition
+        }
+        CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc;
+
+        boolean hasCondition = false;
+        StringBuilder whereBuilder = new StringBuilder();
+        whereBuilder.append("WHERE");
+
+        CubeDesc cubeDesc = desc.getCubeDesc();
+        DataModelDesc model = cubeDesc.getModel();
+        
+        if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) {
+            whereBuilder.append(" (").append(model.getFilterCondition()).append(") ");
+            hasCondition = true;
+        }
+
+        CubeSegment cubeSegment = desc.getCubeSegment();
+
+        if (null != cubeSegment) {
+            PartitionDesc partDesc = model.getPartitionDesc();
+            long dateStart = cubeSegment.getDateRangeStart();
+            long dateEnd = cubeSegment.getDateRangeEnd();
+
+            if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
+                whereBuilder.append(hasCondition ? " AND (" : " (");
+                whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, dateStart, dateEnd, tableAliasMap));
+                whereBuilder.append(")\n");
+                hasCondition = true;
+            }
+        }
+
+        if (hasCondition) {
+            sql.append(whereBuilder.toString());
+        }
+    }
+
+    private static String colName(String canonicalColName) {
+        return canonicalColName.replace(".", "_");
+    }
+    
+    private static String getHiveDataType(String javaDataType) {
+        String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType;
+        hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType;
+
+        return hiveDataType.toLowerCase();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
new file mode 100644
index 0000000..2ed2fc2
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.lock.JobLock;
+
+/**
+ */
+public interface Scheduler<T extends Executable> {
+
+    void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException;
+
+    void shutdown() throws SchedulerException;
+
+    boolean stop(T executable) throws SchedulerException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
new file mode 100644
index 0000000..29b5324
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+/**
+ */
+public abstract class BaseCommandOutput implements ICommandOutput {
+
+    @Override
+    public void log(String message) {
+        this.appendOutput(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
new file mode 100644
index 0000000..6cab6a3
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.common.util.Logger;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+
+/**
+ * @author xjiang
+ * 
+ */
+public interface ICommandOutput extends Logger {
+
+    public void setStatus(JobStepStatusEnum status);
+
+    public JobStepStatusEnum getStatus();
+
+    public void appendOutput(String message);
+
+    public String getOutput();
+
+    public void setExitCode(int exitCode);
+
+    public int getExitCode();
+
+    public void reset();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java b/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
new file mode 100644
index 0000000..5a47173
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.job.exception.JobException;
+
+/**
+ * @author xjiang
+ * 
+ */
+public interface IJobCommand {
+
+    public ICommandOutput execute() throws JobException;
+
+    public void cancel() throws JobException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
new file mode 100644
index 0000000..6a718fc
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.*;
+
+/**
+ * @author xjiang
+ * 
+ */
+public class ShellCmd implements IJobCommand {
+
+    private static Logger log = LoggerFactory.getLogger(ShellCmd.class);
+
+    private final String executeCommand;
+    private final ICommandOutput output;
+    private final boolean isAsync;
+    private final CliCommandExecutor cliCommandExecutor;
+
+    private FutureTask<Integer> future;
+
+    private ShellCmd(String executeCmd, ICommandOutput out, String host, int port, String user, String password, boolean async) {
+        this.executeCommand = executeCmd;
+        this.output = out;
+        this.cliCommandExecutor = new CliCommandExecutor();
+        this.cliCommandExecutor.setRunAtRemote(host, port, user, password);
+        this.isAsync = async;
+    }
+
+    public ShellCmd(String executeCmd, String host, int port, String user, String password, boolean async) {
+        this(executeCmd, new ShellCmdOutput(), host, port, user, password, async);
+    }
+
+    @Override
+    public ICommandOutput execute() throws JobException {
+
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+        future = new FutureTask<Integer>(new Callable<Integer>() {
+            public Integer call() throws JobException, IOException {
+                executor.shutdown();
+                return executeCommand(executeCommand);
+            }
+        });
+        executor.execute(future);
+
+        int exitCode = -1;
+        if (!isAsync) {
+            try {
+                exitCode = future.get();
+                log.info("finish executing");
+            } catch (CancellationException e) {
+                log.debug("Command is cancelled");
+                exitCode = -2;
+            } catch (Exception e) {
+                throw new JobException("Error when execute job " + executeCommand, e);
+            } finally {
+                if (exitCode == 0) {
+                    output.setStatus(JobStepStatusEnum.FINISHED);
+                } else if (exitCode == -2) {
+                    output.setStatus(JobStepStatusEnum.DISCARDED);
+                } else {
+                    output.setStatus(JobStepStatusEnum.ERROR);
+                }
+                output.setExitCode(exitCode);
+            }
+        }
+        return output;
+    }
+
+    protected int executeCommand(String command) throws JobException, IOException {
+        output.reset();
+        output.setStatus(JobStepStatusEnum.RUNNING);
+        return cliCommandExecutor.execute(command, output).getFirst();
+    }
+
+    @Override
+    public void cancel() throws JobException {
+        future.cancel(true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
new file mode 100644
index 0000000..ebcad47
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cmd;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+
+/**
+ * @author xjiang
+ * 
+ */
+public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
+
+    protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class);
+
+    protected StringBuilder output;
+    protected int exitCode;
+    protected JobStepStatusEnum status;
+
+    public ShellCmdOutput() {
+        init();
+    }
+
+    private void init() {
+        output = new StringBuilder();
+        exitCode = -1;
+        status = JobStepStatusEnum.NEW;
+    }
+
+    @Override
+    public JobStepStatusEnum getStatus() {
+        return status;
+    }
+
+    @Override
+    public void setStatus(JobStepStatusEnum s) {
+        this.status = s;
+    }
+
+    @Override
+    public String getOutput() {
+        return output.toString();
+    }
+
+    @Override
+    public void appendOutput(String message) {
+        output.append(message).append(System.getProperty("line.separator"));
+        log.debug(message);
+    }
+
+    @Override
+    public int getExitCode() {
+        return exitCode;
+    }
+
+    @Override
+    public void setExitCode(int code) {
+        exitCode = code;
+    }
+
+    @Override
+    public void reset() {
+        init();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java b/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
new file mode 100644
index 0000000..1cda348
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.common;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+/**
+ */
+public class OptionsHelper {
+    private CommandLine commandLine;
+
+    public void parseOptions(Options options, String[] args) throws ParseException {
+        CommandLineParser parser = new GnuParser();
+        commandLine = parser.parse(options, args);
+    }
+
+    public Option[] getOptions() {
+        return commandLine.getOptions();
+    }
+
+    public String getOptionsAsString() {
+        StringBuilder buf = new StringBuilder();
+        for (Option option : commandLine.getOptions()) {
+            buf.append(" ");
+            buf.append(option.getOpt());
+            if (option.hasArg()) {
+                buf.append("=");
+                buf.append(option.getValue());
+            }
+        }
+        return buf.toString();
+    }
+
+    public String getOptionValue(Option option) {
+        return commandLine.getOptionValue(option.getOpt());
+    }
+
+    public boolean hasOption(Option option) {
+        return commandLine.hasOption(option.getOpt());
+    }
+
+    public void printUsage(String programName, Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(programName, options);
+    }
+
+    public static String convertToFileURL(String path) {
+        if (File.separatorChar != '/') {
+            path = path.replace(File.separatorChar, '/');
+        }
+
+        return path;
+    }
+
+}