You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:44 UTC

[42/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
new file mode 100644
index 0000000..0b812a6
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
@@ -0,0 +1,157 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderTest.class);
+
+    private static final int INPUT_ROWS = 10000;
+    private static final int SPLIT_ROWS = 5000;
+    private static final int THREADS = 4;
+
+    private static CubeInstance cube;
+    private static String flatTable;
+    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+
+    @BeforeClass
+    public static void before() throws IOException {
+        staticCreateTestMetadata();
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+        dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        long randSeed = System.currentTimeMillis();
+
+        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        doggedBuilder.setConcurrentThreads(THREADS);
+        doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
+        FileRecordWriter doggedResult = new FileRecordWriter();
+
+        {
+            Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
+            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            future.get();
+            doggedResult.close();
+        }
+
+        InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        inmemBuilder.setConcurrentThreads(THREADS);
+        FileRecordWriter inmemResult = new FileRecordWriter();
+
+        {
+            Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            future.get();
+            inmemResult.close();
+        }
+
+        fileCompare(doggedResult.file, inmemResult.file);
+        doggedResult.file.delete();
+        inmemResult.file.delete();
+    }
+
+    private void fileCompare(File file, File file2) throws IOException {
+        BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
+        BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8"));
+
+        String line1, line2;
+        do {
+            line1 = r1.readLine();
+            line2 = r2.readLine();
+            
+            assertEquals(line1, line2);
+            
+        } while (line1 != null || line2 != null);
+
+        r1.close();
+        r2.close();
+    }
+
+    class FileRecordWriter implements ICuboidWriter {
+
+        File file;
+        PrintWriter writer;
+
+        FileRecordWriter() throws IOException {
+            file = File.createTempFile("DoggedCubeBuilderTest_", ".data");
+            writer = new PrintWriter(file, "UTF-8");
+        }
+
+        @Override
+        public void write(long cuboidId, GTRecord record) throws IOException {
+            writer.print(cuboidId);
+            writer.print(", ");
+            writer.print(record.toString());
+            writer.println();
+        }
+
+        public void close() {
+            writer.close();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
new file mode 100644
index 0000000..1487dff
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
@@ -0,0 +1,208 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class);
+
+    private static final int INPUT_ROWS = 70000;
+    private static final int THREADS = 4;
+    
+    private static CubeInstance cube;
+    private static String flatTable;
+    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+    
+    @BeforeClass
+    public static void before() throws IOException {
+        staticCreateTestMetadata();
+        
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+        
+        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+        dictionaryMap = getDictionaryMap(cube, flatTable);
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        cubeBuilder.setConcurrentThreads(THREADS);
+        
+        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+        try {
+            // round 1
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, INPUT_ROWS);
+                future.get();
+            }
+            
+            // round 2, zero input
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, 0);
+                future.get();
+            }
+            
+            // round 3
+            {
+                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+                feedData(cube, flatTable, queue, INPUT_ROWS);
+                future.get();
+            }
+            
+        } catch (Exception e) {
+            logger.error("stream build failed", e);
+            throw new IOException("Failed to build cube ", e);
+        }
+    }
+
+    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+        feedData(cube, flatTable, queue, count, 0);
+    }
+    
+    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
+        int nColumns = flatTableDesc.getColumnList().size();
+
+        @SuppressWarnings("unchecked")
+        Set<String>[] distinctSets = new Set[nColumns];
+        for (int i = 0; i < nColumns; i++)
+            distinctSets[i] = new TreeSet<String>();
+
+        // get distinct values on each column
+        List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+        for (String line : lines) {
+            String[] row = line.trim().split(",");
+            assert row.length == nColumns;
+            for (int i = 0; i < nColumns; i++)
+                distinctSets[i].add(row[i]);
+        }
+
+        List<String[]> distincts = new ArrayList<String[]>();
+        for (int i = 0; i < nColumns; i++) {
+            distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
+        }
+
+        Random rand = new Random();
+        if (randSeed != 0)
+            rand.setSeed(randSeed);
+        
+        // output with random data
+        for (; count > 0; count--) {
+            ArrayList<String> row = new ArrayList<String>(nColumns);
+            for (int i = 0; i < nColumns; i++) {
+                String[] candidates = distincts.get(i);
+                row.add(candidates[rand.nextInt(candidates.length)]);
+            }
+            queue.put(row);
+        }
+        queue.put(new ArrayList<String>(0));
+    }
+
+    static Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
+        Map<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+        CubeDesc desc = cube.getDescriptor();
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+        int nColumns = flatTableDesc.getColumnList().size();
+
+        List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
+        for (int c = 0; c < columns.size(); c++) {
+            TblColRef col = columns.get(c);
+            if (desc.getRowkey().isUseDictionary(col)) {
+                logger.info("Building dictionary for " + col);
+                List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
+                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), valueList);
+                result.put(col, dict);
+            }
+        }
+        return result;
+    }
+
+    private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
+        List<byte[]> result = Lists.newArrayList();
+        List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+        for (String line : lines) {
+            String[] row = line.trim().split(",");
+            assert row.length == nColumns;
+            if (row[c] != null) {
+                result.add(Bytes.toBytes(row[c]));
+            }
+        }
+        return result;
+    }
+
+    class ConsoleGTRecordWriter implements ICuboidWriter {
+
+        boolean verbose = false;
+
+        @Override
+        public void write(long cuboidId, GTRecord record) throws IOException {
+            if (verbose)
+                System.out.println(record.toString());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
new file mode 100644
index 0000000..645908d
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.cube.inmemcubing.MemDiskStore;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.junit.Test;
+
+public class MemDiskStoreTest {
+
+    final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
+    final GTInfo info = UnitTestSupport.advancedInfo();
+    final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
+
+    @Test
+    public void testSingleThreadWriteRead() throws IOException {
+        long start = System.currentTimeMillis();
+        verifyOneTableWriteAndRead();
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    @Test
+    public void testMultiThreadWriteRead() throws IOException, InterruptedException {
+        long start = System.currentTimeMillis();
+
+        int nThreads = 5;
+        Thread[] t = new Thread[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+            t[i] = new Thread() {
+                public void run() {
+                    try {
+                        verifyOneTableWriteAndRead();
+                    } catch (Exception ex) {
+                        ex.printStackTrace();
+                    }
+                }
+            };
+            t[i].start();
+        }
+        for (int i = 0; i < nThreads; i++) {
+            t[i].join();
+        }
+
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    private void verifyOneTableWriteAndRead() throws IOException {
+        MemDiskStore store = new MemDiskStore(info, budgetCtrl);
+        GridTable table = new GridTable(info, store);
+        verifyWriteAndRead(table);
+    }
+
+    private void verifyWriteAndRead(GridTable table) throws IOException {
+        GTInfo info = table.getInfo();
+
+        GTBuilder builder = table.rebuild();
+        for (GTRecord r : data) {
+            builder.write(r);
+        }
+        builder.close();
+
+        IGTScanner scanner = table.scan(new GTScanRequest(info));
+        int i = 0;
+        for (GTRecord r : scanner) {
+            assertEquals(data.get(i++), r);
+        }
+        scanner.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java
new file mode 100644
index 0000000..f7bf432
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+public class MemoryBudgetControllerTest {
+
+    @Test
+    public void test() {
+        final int n = MemoryBudgetController.getSystemAvailMB() / 2;
+        final MemoryBudgetController mbc = new MemoryBudgetController(n);
+
+        ArrayList<Consumer> mbList = new ArrayList<Consumer>();
+        for (int i = 0; i < n; i++) {
+            mbList.add(new Consumer(mbc));
+            assertEquals(mbList.size(), mbc.getTotalReservedMB());
+        }
+
+        // a's reservation will free up all the previous
+        final Consumer a = new Consumer();
+        mbc.reserve(a, n);
+        for (int i = 0; i < n; i++) {
+            assertEquals(null, mbList.get(i).data);
+        }
+        
+        // cancel a in 2 seconds
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                mbc.reserve(a, 0);
+            }
+        }.start();
+        
+        // b will success after some wait
+        long bWaitStart = System.currentTimeMillis();
+        final Consumer b = new Consumer();
+        mbc.reserveInsist(b, n);
+        assertTrue(System.currentTimeMillis() - bWaitStart > 1000);
+
+        try {
+            mbc.reserve(a, 1);
+            fail();
+        } catch (NotEnoughBudgetException ex) {
+            // expected
+        }
+    }
+
+    class Consumer implements MemoryBudgetController.MemoryConsumer {
+
+        byte[] data;
+
+        Consumer() {
+        }
+
+        Consumer(MemoryBudgetController mbc) {
+            mbc.reserve(this, 1);
+            data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data 
+        }
+
+        @Override
+        public int freeUp(int mb) {
+            if (data != null) {
+                data = null;
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
new file mode 100644
index 0000000..f53f11f
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
@@ -0,0 +1,214 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.math.BigDecimal;
+import java.util.Comparator;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.gridtable.GTAggregateScanner;
+import org.apache.kylin.metadata.measure.BigDecimalSumAggregator;
+import org.apache.kylin.metadata.measure.DoubleSumAggregator;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.measure.HLLCAggregator;
+import org.apache.kylin.metadata.measure.LongSumAggregator;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.junit.Test;
+
+public class AggregationCacheMemSizeTest {
+
+    public static final int NUM_OF_OBJS = 1000000 / 2;
+
+    interface CreateAnObject {
+        Object create();
+    }
+
+    @Test
+    public void testHLLCAggregatorSize() throws InterruptedException {
+        int est = estimateObjectSize(new CreateAnObject() {
+            @Override
+            public Object create() {
+                HLLCAggregator aggr = new HLLCAggregator(10);
+                aggr.aggregate(new HyperLogLogPlusCounter(10));
+                return aggr;
+            }
+        });
+        System.out.println("HLLC: " + est);
+    }
+
+    @Test
+    public void testBigDecimalAggregatorSize() throws InterruptedException {
+        int est = estimateObjectSize(new CreateAnObject() {
+            @Override
+            public Object create() {
+                return newBigDecimalAggr();
+            }
+
+        });
+        System.out.println("BigDecimal: " + est);
+    }
+
+    private BigDecimalSumAggregator newBigDecimalAggr() {
+        BigDecimalSumAggregator aggr = new BigDecimalSumAggregator();
+        aggr.aggregate(new BigDecimal("12345678901234567890.123456789"));
+        return aggr;
+    }
+
+    @Test
+    public void testLongAggregatorSize() throws InterruptedException {
+        int est = estimateObjectSize(new CreateAnObject() {
+            @Override
+            public Object create() {
+                return newLongAggr();
+            }
+        });
+        System.out.println("Long: " + est);
+    }
+
+    private LongSumAggregator newLongAggr() {
+        LongSumAggregator aggr = new LongSumAggregator();
+        aggr.aggregate(new LongMutable(10));
+        return aggr;
+    }
+
+    @Test
+    public void testDoubleAggregatorSize() throws InterruptedException {
+        int est = estimateObjectSize(new CreateAnObject() {
+            @Override
+            public Object create() {
+                return newDoubleAggr();
+            }
+        });
+        System.out.println("Double: " + est);
+    }
+
+    private DoubleSumAggregator newDoubleAggr() {
+        DoubleSumAggregator aggr = new DoubleSumAggregator();
+        aggr.aggregate(new DoubleMutable(10));
+        return aggr;
+    }
+
+    @Test
+    public void testByteArraySize() throws InterruptedException {
+        int est = estimateObjectSize(new CreateAnObject() {
+            @Override
+            public Object create() {
+                return new byte[10];
+            }
+        });
+        System.out.println("byte[10]: " + est);
+    }
+
+    @Test
+    public void testAggregatorArraySize() throws InterruptedException {
+        int est = estimateObjectSize(new CreateAnObject() {
+            @Override
+            public Object create() {
+                return new MeasureAggregator[7];
+            }
+        });
+        System.out.println("MeasureAggregator[7]: " + est);
+    }
+
+    @Test
+    public void testTreeMapSize() throws InterruptedException {
+        final SortedMap<byte[], Object> map = new TreeMap<byte[], Object>(new Comparator<byte[]>() {
+            @Override
+            public int compare(byte[] o1, byte[] o2) {
+                return Bytes.compareTo(o1, o2);
+            }
+        });
+        final Random rand = new Random();
+        int est = estimateObjectSize(new CreateAnObject() {
+            @Override
+            public Object create() {
+                byte[] key = new byte[10];
+                rand.nextBytes(key);
+                map.put(key, null);
+                return null;
+            }
+        });
+        System.out.println("TreeMap entry: " + (est - 20)); // -20 is to exclude byte[10]
+    }
+
+    @Test
+    public void testAggregationCacheSize() throws InterruptedException {
+        final SortedMap<byte[], Object> map = new TreeMap<byte[], Object>(new Comparator<byte[]>() {
+            @Override
+            public int compare(byte[] o1, byte[] o2) {
+                return Bytes.compareTo(o1, o2);
+            }
+        });
+        final Random rand = new Random();
+
+        long bytesBefore = memLeft();
+        byte[] key = null;
+        MeasureAggregator<?>[] aggrs = null;
+        for (int i = 0; i < NUM_OF_OBJS; i++) {
+            key = new byte[10];
+            rand.nextBytes(key);
+            aggrs = new MeasureAggregator[4];
+            aggrs[0] = newBigDecimalAggr();
+            aggrs[1] = newLongAggr();
+            aggrs[2] = newDoubleAggr();
+            aggrs[3] = newDoubleAggr();
+            map.put(key, aggrs);
+        }
+
+        long bytesAfter = memLeft();
+        
+        long mapActualSize = bytesBefore - bytesAfter;
+        long mapExpectSize = GTAggregateScanner.estimateSizeOfAggrCache(key, aggrs, map.size());
+        System.out.println("Actual cache size: " + mapActualSize);
+        System.out.println("Expect cache size: " + mapExpectSize);
+    }
+
+    private int estimateObjectSize(CreateAnObject factory) throws InterruptedException {
+        Object[] hold = new Object[NUM_OF_OBJS];
+        long bytesBefore = memLeft();
+
+        for (int i = 0; i < hold.length; i++) {
+            hold[i] = factory.create();
+        }
+
+        long bytesAfter = memLeft();
+        return (int) ((bytesBefore - bytesAfter) / hold.length);
+    }
+
+    private long memLeft() throws InterruptedException {
+        Runtime.getRuntime().gc();
+        Thread.sleep(500);
+        return getSystemAvailBytes();
+    }
+
+    private long getSystemAvailBytes() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+        return availableMemory;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
new file mode 100644
index 0000000..c90c5d3
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -0,0 +1,381 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.gridtable.CubeCodeSystem;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.NumberDictionaryBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.ExtractTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class DictGridTableTest {
+
+    @Test
+    public void test() throws IOException {
+        GridTable table = newTestTable();
+        verifyScanRangePlanner(table);
+        verifyFirstRow(table);
+        verifyScanWithUnevaluatableFilter(table);
+        verifyScanWithEvaluatableFilter(table);
+        verifyConvertFilterConstants1(table);
+        verifyConvertFilterConstants2(table);
+        verifyConvertFilterConstants3(table);
+        verifyConvertFilterConstants4(table);
+    }
+
+    private void verifyScanRangePlanner(GridTable table) {
+        GTInfo info = table.getInfo();
+        GTScanRangePlanner planner = new GTScanRangePlanner(info);
+        
+        CompareTupleFilter timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        CompareTupleFilter timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
+        CompareTupleFilter timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
+        CompareTupleFilter timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
+        CompareTupleFilter ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
+        CompareTupleFilter ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
+        CompareTupleFilter ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
+        CompareTupleFilter ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+        
+        // flatten or-and & hbase fuzzy value
+        {
+            LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(1, r.size());
+            assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString());
+            assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString());
+        }
+        
+        // pre-evaluate ever false
+        {
+            LogicalTupleFilter filter = and(timeComp1, timeComp2);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(0, r.size());
+        }
+        
+        // pre-evaluate ever true
+        {
+            LogicalTupleFilter filter = or(timeComp1, ageComp4);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals("[[null, null]-[null, null]]", r.toString());
+        }
+        
+        // merge overlap range
+        {
+            LogicalTupleFilter filter = or(timeComp1, timeComp3);
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals("[[null, null]-[null, null]]", r.toString());
+        }
+        
+        // merge too many ranges
+        {
+            LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3));
+            List<GTScanRange> r = planner.planScanRanges(filter);
+            assertEquals(3, r.size());
+            assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
+            assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
+            assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
+            List<GTScanRange> r2 = planner.planScanRanges(filter, 2);
+            assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
+        }
+    }
+
+    private void verifyFirstRow(GridTable table) throws IOException {
+        doScanAndVerify(table, new GTScanRequest(table.getInfo()), "[1421193600000, 30, Yang, 10, 10.5]");
+    }
+
+    private void verifyScanWithUnevaluatableFilter(GridTable table) throws IOException {
+        GTInfo info = table.getInfo();
+
+        CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
+        LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
+        LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
+
+        GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+
+        // note the unEvaluatable column 1 in filter is added to group by
+        assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+        
+        doScanAndVerify(table, req, "[1421280000000, 20, null, 20, null]");
+    }
+    
+    private void verifyScanWithEvaluatableFilter(GridTable table) throws IOException {
+        GTInfo info = table.getInfo();
+
+        CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+        
+        // note the evaluatable column 1 in filter is added to returned columns but not in group by
+        assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+        
+        doScanAndVerify(table, req, "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]");
+    }
+
+    private void verifyConvertFilterConstants1(GridTable table) {
+        GTInfo info = table.getInfo();
+        
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+        TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+        
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString());
+    }
+
+    private void verifyConvertFilterConstants2(GridTable table) {
+        GTInfo info = table.getInfo();
+        
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+        TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+        
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LT, "9");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+        
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        // $1<"9" round up to $1<"10"
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString());
+    }
+    
+    private void verifyConvertFilterConstants3(GridTable table) {
+        GTInfo info = table.getInfo();
+        
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+        TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+        
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LTE, "9");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+        
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        // $1<="9" round down to FALSE
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString());
+    }
+    
+    private void verifyConvertFilterConstants4(GridTable table) {
+        GTInfo info = table.getInfo();
+        
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp"));
+        TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer"));
+        
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+        
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        // $1 in ("9", "10", "15") has only "10" left
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]", newFilter.toString());
+    }
+    
+    private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
+        System.out.println(req);
+        IGTScanner scanner = table.scan(req);
+        int i = 0;
+        for (GTRecord r : scanner) {
+            System.out.println(r);
+            if (verifyRows != null && i < verifyRows.length) {
+                assertEquals(verifyRows[i], r.toString());
+            }
+            i++;
+        }
+        scanner.close();
+    }
+
+    private Object enc(GTInfo info, int col, String value) {
+        ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
+        info.codeSystem.encodeColumnValue(col, value, buf);
+        return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
+    }
+
+    private ExtractTupleFilter unevaluatable(TblColRef col) {
+        ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
+        r.addChild(new ColumnTupleFilter(col));
+        return r;
+    }
+
+    private CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
+        CompareTupleFilter result = new CompareTupleFilter(op);
+        result.addChild(new ColumnTupleFilter(col));
+        result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
+        return result;
+    }
+
+    private LogicalTupleFilter and(TupleFilter... children) {
+        return logic(FilterOperatorEnum.AND, children);
+    }
+
+    private LogicalTupleFilter or(TupleFilter... children) {
+        return logic(FilterOperatorEnum.OR, children);
+    }
+
+    private LogicalTupleFilter not(TupleFilter child) {
+        return logic(FilterOperatorEnum.NOT, child);
+    }
+
+    private LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
+        LogicalTupleFilter result = new LogicalTupleFilter(op);
+        for (TupleFilter c : children) {
+            result.addChild(c);
+        }
+        return result;
+    }
+
+    static GridTable newTestTable() throws IOException {
+        GTInfo info = newInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTRecord r = new GTRecord(table.getInfo());
+        GTBuilder builder = table.rebuild();
+
+        builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5")));
+        builder.close();
+
+        return table;
+    }
+
+    static GTInfo newInfo() {
+        Builder builder = GTInfo.builder();
+        builder.setCodeSystem(newDictCodeSystem());
+        builder.setColumns( //
+                DataType.getInstance("timestamp"), //
+                DataType.getInstance("integer"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("bigint"), //
+                DataType.getInstance("decimal") //
+        );
+        builder.setPrimaryKey(setOf(0, 1));
+        builder.setColumnPreferIndex(setOf(0));
+        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
+        builder.enableRowBlock(4);
+        GTInfo info = builder.build();
+        return info;
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static CubeCodeSystem newDictCodeSystem() {
+        Map<Integer, Dictionary> dictionaryMap = Maps.newHashMap();
+        dictionaryMap.put(1, newDictionaryOfInteger());
+        dictionaryMap.put(2, newDictionaryOfString());
+        return new CubeCodeSystem(dictionaryMap);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static Dictionary newDictionaryOfString() {
+        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+        builder.addValue("Dong");
+        builder.addValue("George");
+        builder.addValue("Jason");
+        builder.addValue("Kejia");
+        builder.addValue("Luke");
+        builder.addValue("Mahone");
+        builder.addValue("Qianhao");
+        builder.addValue("Shaofeng");
+        builder.addValue("Xu");
+        builder.addValue("Yang");
+        return builder.build(0);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static Dictionary newDictionaryOfInteger() {
+        NumberDictionaryBuilder<String> builder = new NumberDictionaryBuilder<>(new StringBytesConverter());
+        builder.addValue("10");
+        builder.addValue("20");
+        builder.addValue("30");
+        builder.addValue("40");
+        builder.addValue("50");
+        builder.addValue("60");
+        builder.addValue("70");
+        builder.addValue("80");
+        builder.addValue("90");
+        builder.addValue("100");
+        return builder.build(0);
+    }
+
+    private static ImmutableBitSet setOf(int... values) {
+        BitSet set = new BitSet();
+        for (int i : values)
+            set.set(i);
+        return new ImmutableBitSet(set);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
new file mode 100644
index 0000000..c87e970
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -0,0 +1,195 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.junit.Test;
+
+public class SimpleGridTableTest {
+
+    @Test
+    public void testBasics() throws IOException {
+        GTInfo info = UnitTestSupport.basicInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTBuilder builder = rebuild(table);
+        IGTScanner scanner = scan(table);
+        assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+        assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+    }
+
+    @Test
+    public void testAdvanced() throws IOException {
+        GTInfo info = UnitTestSupport.advancedInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTBuilder builder = rebuild(table);
+        IGTScanner scanner = scan(table);
+        assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+        assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+    }
+
+    @Test
+    public void testAggregate() throws IOException {
+        GTInfo info = UnitTestSupport.advancedInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTBuilder builder = rebuild(table);
+        IGTScanner scanner = scanAndAggregate(table);
+        assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount());
+        assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount());
+    }
+
+    @Test
+    public void testAppend() throws IOException {
+        GTInfo info = UnitTestSupport.advancedInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        rebuildViaAppend(table);
+        IGTScanner scanner = scan(table);
+        assertEquals(3, scanner.getScannedRowBlockCount());
+        assertEquals(10, scanner.getScannedRowCount());
+    }
+
+    private IGTScanner scan(GridTable table) throws IOException {
+        GTScanRequest req = new GTScanRequest(table.getInfo());
+        IGTScanner scanner = table.scan(req);
+        for (GTRecord r : scanner) {
+            Object[] v = r.getValues();
+            assertTrue(((String) v[0]).startsWith("2015-"));
+            assertTrue(((String) v[2]).equals("Food"));
+            assertTrue(((LongMutable) v[3]).get() == 10);
+            assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+            System.out.println(r);
+        }
+        scanner.close();
+        System.out.println("Scanned Row Block Count: " + scanner.getScannedRowBlockCount());
+        System.out.println("Scanned Row Count: " + scanner.getScannedRowCount());
+        return scanner;
+    }
+
+    private IGTScanner scanAndAggregate(GridTable table) throws IOException {
+        GTScanRequest req = new GTScanRequest(table.getInfo(), null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
+        IGTScanner scanner = table.scan(req);
+        int i = 0;
+        for (GTRecord r : scanner) {
+            Object[] v = r.getValues();
+            switch (i) {
+            case 0:
+                assertTrue(((LongMutable) v[3]).get() == 20);
+                assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0);
+                break;
+            case 1:
+                assertTrue(((LongMutable) v[3]).get() == 30);
+                assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5);
+                break;
+            case 2:
+                assertTrue(((LongMutable) v[3]).get() == 40);
+                assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0);
+                break;
+            case 3:
+                assertTrue(((LongMutable) v[3]).get() == 10);
+                assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+                break;
+            default:
+                fail();
+            }
+            i++;
+            System.out.println(r);
+        }
+        scanner.close();
+        System.out.println("Scanned Row Block Count: " + scanner.getScannedRowBlockCount());
+        System.out.println("Scanned Row Count: " + scanner.getScannedRowCount());
+        return scanner;
+    }
+
+    static GTBuilder rebuild(GridTable table) throws IOException {
+        GTBuilder builder = table.rebuild();
+        for (GTRecord rec : UnitTestSupport.mockupData(table.getInfo(), 10)) {
+            builder.write(rec);
+        }
+        builder.close();
+
+        System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+        System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+        return builder;
+    }
+    
+    static void rebuildViaAppend(GridTable table) throws IOException {
+        List<GTRecord> data = UnitTestSupport.mockupData(table.getInfo(), 10);
+        GTBuilder builder;
+        int i = 0;
+
+        builder = table.append();
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.close();
+        System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+        System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+        builder = table.append();
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.close();
+        System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+        System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+        builder = table.append();
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.close();
+        System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+        System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+
+        builder = table.append();
+        builder.write(data.get(i++));
+        builder.close();
+        System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
+        System.out.println("Written Row Count: " + builder.getWrittenRowCount());
+    }
+
+    private static ImmutableBitSet setOf(int... values) {
+        BitSet set = new BitSet();
+        for (int i : values)
+            set.set(i);
+        return new ImmutableBitSet(set);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
new file mode 100644
index 0000000..f5c9645
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
@@ -0,0 +1,188 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.*;
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTInvertedIndex;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTRowBlock;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.serializer.StringSerializer;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SimpleInvertedIndexTest {
+
+    GTInfo info;
+    GTInvertedIndex index;
+    ArrayList<CompareTupleFilter> basicFilters = Lists.newArrayList();
+    ArrayList<ConciseSet> basicResults = Lists.newArrayList();
+
+    public SimpleInvertedIndexTest() {
+        
+        info = UnitTestSupport.advancedInfo();
+        TblColRef colA = info.colRef(0);
+        
+        // block i contains value "i", the last is NULL
+        index = new GTInvertedIndex(info);
+        GTRowBlock mockBlock = GTRowBlock.allocate(info);
+        GTRowBlock.Writer writer = mockBlock.getWriter();
+        GTRecord record = new GTRecord(info);
+        for (int i = 0; i < 10; i++) {
+            record.setValues(i < 9 ? "" + i : null, "", "", new LongMutable(0), new BigDecimal(0));
+            for (int j = 0; j < info.getRowBlockSize(); j++) {
+                writer.append(record);
+            }
+            writer.readyForFlush();
+            index.add(mockBlock);
+            
+            writer.clearForNext();
+        }
+        
+        basicFilters.add(compare(colA, FilterOperatorEnum.ISNULL));
+        basicResults.add(set(9));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.ISNOTNULL));
+        basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.EQ, 0));
+        basicResults.add(set(0));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.NEQ, 0));
+        basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.IN, 0, 5));
+        basicResults.add(set(0, 5));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.NOTIN, 0, 5));
+        basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.LT, 3));
+        basicResults.add(set(0, 1, 2));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.LTE, 3));
+        basicResults.add(set(0, 1, 2, 3));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.GT, 3));
+        basicResults.add(set(4, 5, 6, 7, 8));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.GTE, 3));
+        basicResults.add(set(3, 4, 5, 6, 7, 8));
+    }
+
+    @Test
+    public void testBasics() {
+        for (int i = 0; i < basicFilters.size(); i++) {
+            assertEquals(basicResults.get(i), index.filter(basicFilters.get(i)));
+        }
+    }
+
+    @Test
+    public void testLogicalAnd() {
+        for (int i = 0; i < basicFilters.size(); i++) {
+            for (int j = 0; j < basicFilters.size(); j++) {
+                LogicalTupleFilter f = logical(FilterOperatorEnum.AND, basicFilters.get(i), basicFilters.get(j));
+                ConciseSet r = basicResults.get(i).clone();
+                r.retainAll(basicResults.get(j));
+                assertEquals(r, index.filter(f));
+            }
+        }
+    }
+
+    @Test
+    public void testLogicalOr() {
+        for (int i = 0; i < basicFilters.size(); i++) {
+            for (int j = 0; j < basicFilters.size(); j++) {
+                LogicalTupleFilter f = logical(FilterOperatorEnum.OR, basicFilters.get(i), basicFilters.get(j));
+                ConciseSet r = basicResults.get(i).clone();
+                r.addAll(basicResults.get(j));
+                assertEquals(r, index.filter(f));
+            }
+        }
+    }
+
+    @Test
+    public void testNotEvaluable() {
+        ConciseSet all = set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+        
+        CompareTupleFilter notEvaluable = compare(info.colRef(1), FilterOperatorEnum.EQ, 0);
+        assertEquals(all, index.filter(notEvaluable));
+
+        LogicalTupleFilter or = logical(FilterOperatorEnum.OR, basicFilters.get(0), notEvaluable);
+        assertEquals(all, index.filter(or));
+
+        LogicalTupleFilter and = logical(FilterOperatorEnum.AND, basicFilters.get(0), notEvaluable);
+        assertEquals(basicResults.get(0), index.filter(and));
+    }
+
+    public static CompareTupleFilter compare(TblColRef col, TupleFilter.FilterOperatorEnum op, int... ids) {
+        CompareTupleFilter filter = new CompareTupleFilter(op);
+        filter.addChild(columnFilter(col));
+        for (int i : ids) {
+            filter.addChild(constFilter(i));
+        }
+        return filter;
+    }
+
+    public static LogicalTupleFilter logical(TupleFilter.FilterOperatorEnum op, TupleFilter... filters) {
+        LogicalTupleFilter filter = new LogicalTupleFilter(op);
+        for (TupleFilter f : filters)
+            filter.addChild(f);
+        return filter;
+    }
+
+    public static ColumnTupleFilter columnFilter(TblColRef col) {
+        return new ColumnTupleFilter(col);
+    }
+
+    public static ConstantTupleFilter constFilter(int id) {
+        byte[] space = new byte[10];
+        ByteBuffer buf = ByteBuffer.wrap(space);
+        StringSerializer stringSerializer = new StringSerializer(DataType.getInstance("string"));
+        stringSerializer.serialize("" + id, buf);
+        ByteArray data = new ByteArray(buf.array(), buf.arrayOffset(), buf.position());
+        return new ConstantTupleFilter(data);
+    }
+
+    public static ConciseSet set(int... ints) {
+        ConciseSet set = new ConciseSet();
+        for (int i : ints)
+            set.add(i);
+        return set;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/pom.xml
----------------------------------------------------------------------
diff --git a/core-job/pom.xml b/core-job/pom.xml
index b51c1cf..b619f43 100644
--- a/core-job/pom.xml
+++ b/core-job/pom.xml
@@ -41,8 +41,20 @@
             <version>${project.parent.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+        </dependency>
+
         <!-- Env & Test -->
         <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
new file mode 100644
index 0000000..6b814ef
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class BuildEngineFactory {
+    
+    private static IBatchCubingEngine defaultBatchEngine;
+    
+    public static IBatchCubingEngine defaultBatchEngine() {
+        if (defaultBatchEngine == null) {
+            KylinConfig conf = KylinConfig.getInstanceFromEnv();
+            if (conf.isCubingInMem()) {
+                defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine2");
+            } else {
+                defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine");
+            }
+        }
+        return defaultBatchEngine;
+    }
+    
+    /** Build a new cube segment, typically its time range appends to the end of current cube. */
+    public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+        return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
+    }
+    
+    /** Merge multiple small segments into a big one. */
+    public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+        return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
new file mode 100644
index 0000000..904f557
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IBatchCubingEngine {
+
+    /** Build a new cube segment, typically its time range appends to the end of current cube. */
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);
+    
+    /** Merge multiple small segments into a big one. */
+    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
+    
+    public Class<?> getSourceInterface();
+    
+    public Class<?> getStorageInterface();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
new file mode 100644
index 0000000..0359ce9
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
@@ -0,0 +1,8 @@
+package org.apache.kylin.engine;
+
+import org.apache.kylin.cube.CubeSegment;
+
+public interface IStreamingCubingEngine {
+
+    public Runnable createStreamingCubingBuilder(CubeSegment seg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
new file mode 100644
index 0000000..e7f5772
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.constant.JobStepCmdTypeEnum;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.engine.JobEngineConfig;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class JobInstance extends RootPersistentEntity implements Comparable<JobInstance> {
+
+    public static final String JOB_WORKING_DIR_PREFIX = "kylin-";
+
+    public static final String YARN_APP_ID = "yarn_application_id";
+    public static final String YARN_APP_URL = "yarn_application_tracking_url";
+    public static final String MR_JOB_ID = "mr_job_id";
+    public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
+    public static final String SOURCE_RECORDS_COUNT = "source_records_count";
+    public static final String SOURCE_RECORDS_SIZE = "source_records_size";
+
+    public static String getStepIdentity(JobInstance jobInstance, JobStep jobStep) {
+        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid() + "." + jobStep.getSequenceID();
+    }
+
+    public static String getJobIdentity(JobInstance jobInstance) {
+        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid();
+    }
+
+    public static String getJobWorkingDir(JobInstance jobInstance, JobEngineConfig engineConfig) {
+        return getJobWorkingDir(jobInstance.getUuid(), engineConfig.getHdfsWorkingDirectory());
+    }
+
+    public static String getJobWorkingDir(String jobUuid, String hdfsWorkdingDir) {
+        if (jobUuid == null || jobUuid.equals("")) {
+            throw new IllegalArgumentException("jobUuid can't be null or empty");
+        }
+        return hdfsWorkdingDir + "/" + JOB_WORKING_DIR_PREFIX + jobUuid;
+    }
+
+    @JsonProperty("name")
+    private String name;
+
+    @JsonProperty("type")
+    private CubeBuildTypeEnum type; // java implementation
+    @JsonProperty("duration")
+    private long duration;
+    @JsonProperty("related_cube")
+    private String relatedCube;
+    @JsonProperty("related_segment")
+    private String relatedSegment;
+    @JsonProperty("exec_start_time")
+    private long execStartTime;
+    @JsonProperty("exec_end_time")
+    private long execEndTime;
+    @JsonProperty("mr_waiting")
+    private long mrWaiting = 0;
+    @JsonManagedReference
+    @JsonProperty("steps")
+    private List<JobStep> steps;
+    @JsonProperty("submitter")
+    private String submitter;
+    @JsonProperty("job_status")
+    private JobStatusEnum status;
+
+    public JobStep getRunningStep() {
+        for (JobStep step : this.getSteps()) {
+            if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || step.getStatus().equals(JobStepStatusEnum.WAITING)) {
+                return step;
+            }
+        }
+
+        return null;
+    }
+
+    @JsonProperty("progress")
+    public double getProgress() {
+        int completedStepCount = 0;
+        for (JobStep step : this.getSteps()) {
+            if (step.getStatus().equals(JobStepStatusEnum.FINISHED)) {
+                completedStepCount++;
+            }
+        }
+
+        return 100.0 * completedStepCount / steps.size();
+    }
+
+    public JobStatusEnum getStatus() {
+        return this.status;
+    }
+
+    public void setStatus(JobStatusEnum status) {
+        this.status = status;
+    }
+
+//    @JsonProperty("job_status")
+//    public JobStatusEnum getStatus() {
+//
+//        // JobStatusEnum finalJobStatus;
+//        int compositResult = 0;
+//
+//        // if steps status are all NEW, then job status is NEW
+//        // if steps status are all FINISHED, then job status is FINISHED
+//        // if steps status are all PENDING, then job status is PENDING
+//        // if steps status are FINISHED and PENDING, the job status is PENDING
+//        // if one of steps status is RUNNING, then job status is RUNNING
+//        // if one of steps status is ERROR, then job status is ERROR
+//        // if one of steps status is KILLED, then job status is KILLED
+//        // default status is RUNNING
+//
+//        System.out.println(this.getName());
+//
+//        for (JobStep step : this.getSteps()) {
+//            //System.out.println("step: " + step.getSequenceID() + "'s status:" + step.getStatus());
+//            compositResult = compositResult | step.getStatus().getCode();
+//        }
+//
+//        System.out.println();
+//
+//        if (compositResult == JobStatusEnum.FINISHED.getCode()) {
+//            return JobStatusEnum.FINISHED;
+//        } else if (compositResult == JobStatusEnum.NEW.getCode()) {
+//            return JobStatusEnum.NEW;
+//        } else if (compositResult == JobStatusEnum.PENDING.getCode()) {
+//            return JobStatusEnum.PENDING;
+//        } else if (compositResult == (JobStatusEnum.FINISHED.getCode() | JobStatusEnum.PENDING.getCode())) {
+//            return JobStatusEnum.PENDING;
+//        } else if ((compositResult & JobStatusEnum.ERROR.getCode()) == JobStatusEnum.ERROR.getCode()) {
+//            return JobStatusEnum.ERROR;
+//        } else if ((compositResult & JobStatusEnum.DISCARDED.getCode()) == JobStatusEnum.DISCARDED.getCode()) {
+//            return JobStatusEnum.DISCARDED;
+//        } else if ((compositResult & JobStatusEnum.RUNNING.getCode()) == JobStatusEnum.RUNNING.getCode()) {
+//            return JobStatusEnum.RUNNING;
+//        }
+//
+//        return JobStatusEnum.RUNNING;
+//    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public CubeBuildTypeEnum getType() {
+        return type;
+    }
+
+    public void setType(CubeBuildTypeEnum type) {
+        this.type = type;
+    }
+
+    public long getDuration() {
+        return duration;
+    }
+
+    public void setDuration(long duration) {
+        this.duration = duration;
+    }
+
+    public String getRelatedCube() {
+        return relatedCube;
+    }
+
+    public void setRelatedCube(String relatedCube) {
+        this.relatedCube = relatedCube;
+    }
+
+    public String getRelatedSegment() {
+        return relatedSegment;
+    }
+
+    public void setRelatedSegment(String relatedSegment) {
+        this.relatedSegment = relatedSegment;
+    }
+
+    /**
+     * @return the execStartTime
+     */
+    public long getExecStartTime() {
+        return execStartTime;
+    }
+
+    /**
+     * @param execStartTime the execStartTime to set
+     */
+    public void setExecStartTime(long execStartTime) {
+        this.execStartTime = execStartTime;
+    }
+
+    /**
+     * @return the execEndTime
+     */
+    public long getExecEndTime() {
+        return execEndTime;
+    }
+
+    /**
+     * @param execEndTime the execEndTime to set
+     */
+    public void setExecEndTime(long execEndTime) {
+        this.execEndTime = execEndTime;
+    }
+
+    public long getMrWaiting() {
+        return this.mrWaiting;
+    }
+
+    public void setMrWaiting(long mrWaiting) {
+        this.mrWaiting = mrWaiting;
+    }
+
+    public List<JobStep> getSteps() {
+        if (steps == null) {
+            steps = Lists.newArrayList();
+        }
+        return steps;
+    }
+
+    public void clearSteps() {
+        getSteps().clear();
+    }
+
+    public void addSteps(Collection<JobStep> steps) {
+        this.getSteps().addAll(steps);
+    }
+
+    public void addStep(JobStep step) {
+        getSteps().add(step);
+    }
+
+    public void addStep(int index, JobStep step) {
+        getSteps().add(index, step);
+    }
+
+    public JobStep findStep(String stepName) {
+        for (JobStep step : getSteps()) {
+            if (stepName.equals(step.getName())) {
+                return step;
+            }
+        }
+        return null;
+    }
+
+        
+    public String getSubmitter() {
+        return submitter;
+    }
+
+    public void setSubmitter(String submitter) {
+        this.submitter = submitter;
+    }
+
+
+
+
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    public static class JobStep implements Comparable<JobStep> {
+
+        @JsonBackReference
+        private JobInstance jobInstance;
+        
+        @JsonProperty("id")
+        private String id;
+
+        @JsonProperty("name")
+        private String name;
+
+        @JsonProperty("sequence_id")
+        private int sequenceID;
+
+        @JsonProperty("exec_cmd")
+        private String execCmd;
+
+        @JsonProperty("interrupt_cmd")
+        private String InterruptCmd;
+
+        @JsonProperty("exec_start_time")
+        private long execStartTime;
+        @JsonProperty("exec_end_time")
+        private long execEndTime;
+        @JsonProperty("exec_wait_time")
+        private long execWaitTime;
+
+        @JsonProperty("step_status")
+        private JobStepStatusEnum status;
+
+        @JsonProperty("cmd_type")
+        private JobStepCmdTypeEnum cmdType = JobStepCmdTypeEnum.SHELL_CMD_HADOOP;
+
+        @JsonProperty("info")
+        private ConcurrentHashMap<String, String> info = new ConcurrentHashMap<String, String>();
+
+        @JsonProperty("run_async")
+        private boolean runAsync = false;
+
+        private ConcurrentHashMap<String, String> getInfo() {
+            return info;
+        }
+
+        public void putInfo(String key, String value) {
+            getInfo().put(key, value);
+        }
+
+        public String getInfo(String key) {
+            return getInfo().get(key);
+        }
+
+        public void clearInfo() {
+            getInfo().clear();
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public int getSequenceID() {
+            return sequenceID;
+        }
+
+        public void setSequenceID(int sequenceID) {
+            this.sequenceID = sequenceID;
+        }
+
+        public String getExecCmd() {
+            return execCmd;
+        }
+
+        public void setExecCmd(String execCmd) {
+            this.execCmd = execCmd;
+        }
+
+        public JobStepStatusEnum getStatus() {
+            return status;
+        }
+
+        public void setStatus(JobStepStatusEnum status) {
+            this.status = status;
+        }
+        
+        
+
+        public String getId() {
+            return id;
+        }
+
+        public void setId(String id) {
+            this.id = id;
+        }
+
+        /**
+         * @return the execStartTime
+         */
+        public long getExecStartTime() {
+            return execStartTime;
+        }
+
+        /**
+         * @param execStartTime the execStartTime to set
+         */
+        public void setExecStartTime(long execStartTime) {
+            this.execStartTime = execStartTime;
+        }
+
+        /**
+         * @return the execEndTime
+         */
+        public long getExecEndTime() {
+            return execEndTime;
+        }
+
+        /**
+         * @param execEndTime the execEndTime to set
+         */
+        public void setExecEndTime(long execEndTime) {
+            this.execEndTime = execEndTime;
+        }
+
+        public long getExecWaitTime() {
+            return execWaitTime;
+        }
+
+        public void setExecWaitTime(long execWaitTime) {
+            this.execWaitTime = execWaitTime;
+        }
+
+        public String getInterruptCmd() {
+            return InterruptCmd;
+        }
+
+        public void setInterruptCmd(String interruptCmd) {
+            InterruptCmd = interruptCmd;
+        }
+
+        public JobStepCmdTypeEnum getCmdType() {
+            return cmdType;
+        }
+
+        public void setCmdType(JobStepCmdTypeEnum cmdType) {
+            this.cmdType = cmdType;
+        }
+
+        /**
+         * @return the runAsync
+         */
+        public boolean isRunAsync() {
+            return runAsync;
+        }
+
+        /**
+         * @param runAsync the runAsync to set
+         */
+        public void setRunAsync(boolean runAsync) {
+            this.runAsync = runAsync;
+        }
+
+        /**
+         * @return the jobInstance
+         */
+        public JobInstance getJobInstance() {
+            return jobInstance;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((name == null) ? 0 : name.hashCode());
+            result = prime * result + sequenceID;
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            JobStep other = (JobStep) obj;
+            if (name == null) {
+                if (other.name != null)
+                    return false;
+            } else if (!name.equals(other.name))
+                return false;
+            if (sequenceID != other.sequenceID)
+                return false;
+            return true;
+        }
+
+        @Override
+        public int compareTo(JobStep o) {
+            if (this.sequenceID < o.sequenceID) {
+                return -1;
+            } else if (this.sequenceID > o.sequenceID) {
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+    }
+
+    @Override
+    public int compareTo(JobInstance o) {
+        return o.lastModified<this.lastModified?-1:o.lastModified>this.lastModified?1:0;
+    }
+
+}