You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:24 UTC

[16/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java
new file mode 100644
index 0000000..e3397c0
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+/**
+ */
+public class CubeSamplingTest {
+
+    private static final int ROW_LENGTH = 10;
+
+    private final List<String> row = new ArrayList<String>(ROW_LENGTH);
+    private final ByteArray[] row_index = new ByteArray[ROW_LENGTH];
+
+    private Integer[][] allCuboidsBitSet;
+    private HashFunction hf = null;
+    private long baseCuboidId;
+    private HyperLogLogPlusCounter[] allCuboidsHLL = null;
+    private final byte[] seperator = Bytes.toBytes(",");
+
+    @Before
+    public void setup() {
+
+        baseCuboidId = (1l << ROW_LENGTH) - 1;
+        List<Long> allCuboids = Lists.newArrayList();
+        List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+        for (long i = 1; i < baseCuboidId; i++) {
+            allCuboids.add(i);
+            addCuboidBitSet(i, allCuboidsBitSetList);
+        }
+
+        allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]);
+        System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids.");
+        allCuboidsHLL = new HyperLogLogPlusCounter[allCuboids.size()];
+        for (int i = 0; i < allCuboids.size(); i++) {
+            allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
+        }
+
+        //  hf = Hashing.goodFastHash(32);
+//        hf = Hashing.md5();
+        hf = Hashing.murmur3_32();
+
+        for (int i = 0; i < ROW_LENGTH; i++) {
+            row_index[i] = new ByteArray();
+        }
+    }
+
+    private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) {
+        BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+        Integer[] indice = new Integer[bitSet.cardinality()];
+
+        long mask = Long.highestOneBit(baseCuboidId);
+        int position = 0;
+        for (int i = 0; i < ROW_LENGTH; i++) {
+            if ((mask & cuboidId) > 0) {
+                indice[position] = i;
+                position++;
+            }
+            mask = mask >> 1;
+        }
+
+        allCuboidsBitSet.add(indice);
+
+    }
+
+    @Test
+    public void test() {
+
+        long start = System.currentTimeMillis();
+        List<String> row;
+        for (int i = 0; i < 10000; i++) {
+            row = getRandomRow();
+            putRowKeyToHLL(row);
+        }
+
+        long duration = System.currentTimeMillis() - start;
+        System.out.println("The test takes " + duration / 1000 + "seconds.");
+    }
+
+    private void putRowKeyToHLL(List<String> row) {
+        int x = 0;
+        for (String field : row) {
+            Hasher hc = hf.newHasher();
+            row_index[x++].set(hc.putString(field).hash().asBytes());
+        }
+
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            Hasher hc = hf.newHasher();
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                hc.putBytes(row_index[allCuboidsBitSet[i][position]].array());
+                hc.putBytes(seperator);
+            }
+            allCuboidsHLL[i].add(hc.hash().asBytes());
+        }
+    }
+
+    private List<String> getRandomRow() {
+        row.clear();
+        for (int i = 0; i < ROW_LENGTH; i++) {
+            row.add(RandomStringUtils.random(10));
+        }
+        return row;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
new file mode 100644
index 0000000..ae75c61
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
@@ -0,0 +1,37 @@
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ */
+public class FactDistinctColumnsReducerTest {
+
+
+    @Test
+    public void testWriteCuboidStatistics() throws IOException {
+
+        final Configuration conf = HadoopUtil.getCurrentConfiguration();
+        final Path outputPath = new Path("file:///tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString());
+        if (!FileSystem.getLocal(conf).exists(outputPath)) {
+//            FileSystem.getLocal(conf).create(outputPath);
+        }
+
+        System.out.println(outputPath);
+        Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
+        FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100);
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
new file mode 100644
index 0000000..fad521f
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class HiveToBaseCuboidMapperPerformanceTest {
+
+    String metadataUrl = "hbase:yadesk00:2181:/hbase-unsecure";
+    String cubeName = "test_kylin_cube_with_slr";
+    Path srcPath = new Path("/download/test_kylin_cube_with_slr_intermediate_table_64mb.seq");
+
+    @Ignore("convenient trial tool for dev")
+    @Test
+    public void test() throws IOException, InterruptedException {
+        Configuration hconf = new Configuration();
+        HiveToBaseCuboidMapper mapper = new HiveToBaseCuboidMapper();
+        Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
+
+        mapper.setup(context);
+
+        Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
+        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
+        Text value = new Text();
+
+        while (reader.next(key, value)) {
+            mapper.map(key, value, context);
+        }
+
+        reader.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
new file mode 100644
index 0000000..45a2dcc
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
+
+/**
+ * @author ysong1
+ */
+public class MergeCuboidJobTest extends LocalFileMetadataTestCase {
+
+    private Configuration conf;
+
+    @Before
+    public void setup() throws Exception {
+        conf = new Configuration();
+        conf.set("fs.default.name", "file:///");
+        conf.set("mapred.job.tracker", "local");
+
+        // for local runner out-of-memory issue
+        conf.set("mapreduce.task.io.sort.mb", "10");
+        createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+        // String input =
+        // "src/test/resources/data/base_cuboid,src/test/resources/data/6d_cuboid";
+        String output = "target/test-output/merged_cuboid";
+        String cubeName = "test_kylin_cube_with_slr_ready";
+        String jobname = "merge_cuboid";
+
+        File baseFolder = File.createTempFile("kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa-", "base");
+        baseFolder.delete();
+        baseFolder.mkdir();
+        FileUtils.copyDirectory(new File("src/test/resources/data/base_cuboid"), baseFolder);
+        baseFolder.deleteOnExit();
+
+        File sixDFolder = File.createTempFile("kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa-", "6d");
+        sixDFolder.delete();
+        sixDFolder.mkdir();
+        FileUtils.copyDirectory(new File("src/test/resources/data/base_cuboid"), sixDFolder);
+        sixDFolder.deleteOnExit();
+
+        FileUtil.fullyDelete(new File(output));
+
+        // CubeManager cubeManager =
+        // CubeManager.getInstanceFromEnv(getTestConfig());
+
+        String[] args = { "-input", baseFolder.getAbsolutePath() + "," + sixDFolder.getAbsolutePath(), "-cubename", cubeName, "-segmentname", "20130331080000_20131212080000", "-output", output, "-jobname", jobname };
+        assertEquals("Job failed", 0, ToolRunner.run(conf, new MergeCuboidJob(), args));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
new file mode 100644
index 0000000..a8d9670
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.TrieDictionary;
+import org.apache.kylin.engine.mr.steps.MergeCuboidMapper;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.source.ReadableTable.TableSignature;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author honma
+ */
+@SuppressWarnings("rawtypes")
+public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(MergeCuboidMapperTest.class);
+
+    MapDriver<Text, Text, Text, Text> mapDriver;
+    CubeManager cubeManager;
+    CubeInstance cube;
+    DictionaryManager dictionaryManager;
+
+    TblColRef lfn;
+    TblColRef lsi;
+    TblColRef ssc;
+
+    private DictionaryInfo makeSharedDict() throws IOException {
+        TableSignature signature = new TableSignature();
+        signature.setSize(100);
+        signature.setLastModifiedTime(System.currentTimeMillis());
+        signature.setPath("fake_common_dict");
+
+        DictionaryInfo newDictInfo = new DictionaryInfo("", "", 0, "string", signature);
+
+        List<byte[]> values = new ArrayList<byte[]>();
+        values.add(new byte[] { 101, 101, 101 });
+        values.add(new byte[] { 102, 102, 102 });
+        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()), values);
+        newDictInfo.setCardinality(dict.getSize());
+        dictionaryManager.trySaveNewDict(dict, newDictInfo);
+        ((TrieDictionary) dict).dump(System.out);
+
+        return newDictInfo;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        createTestMetadata();
+
+        logger.info("The metadataUrl is : " + getTestConfig());
+
+        MetadataManager.clearCache();
+        CubeManager.clearCache();
+        ProjectManager.clearCache();
+        DictionaryManager.clearCache();
+
+        // hack for distributed cache
+        // CubeManager.removeInstance(KylinConfig.createInstanceFromUri("../job/meta"));//to
+        // make sure the following mapper could get latest CubeManger
+        FileUtils.deleteDirectory(new File("../job/meta"));
+
+        MergeCuboidMapper mapper = new MergeCuboidMapper();
+        mapDriver = MapDriver.newMapDriver(mapper);
+
+        cubeManager = CubeManager.getInstance(getTestConfig());
+        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_ready_2_segments");
+        dictionaryManager = DictionaryManager.getInstance(getTestConfig());
+        lfn = cube.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_FORMAT_NAME");
+        lsi = cube.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");
+        ssc = cube.getDescriptor().findColumnRef("DEFAULT.TEST_CATEGORY_GROUPINGS", "META_CATEG_NAME");
+
+        DictionaryInfo sharedDict = makeSharedDict();
+
+        boolean isFirstSegment = true;
+        for (CubeSegment segment : cube.getSegments()) {
+
+            TableSignature signature = new TableSignature();
+            signature.setSize(100);
+            signature.setLastModifiedTime(System.currentTimeMillis());
+            signature.setPath("fake_dict_for" + lfn.getName() + segment.getName());
+
+            DictionaryInfo newDictInfo = new DictionaryInfo(lfn.getTable(), lfn.getColumnDesc().getName(), lfn.getColumnDesc().getZeroBasedIndex(), "string", signature);
+
+            List<byte[]> values = new ArrayList<byte[]>();
+            values.add(new byte[] { 97, 97, 97 });
+            if (isFirstSegment)
+                values.add(new byte[] { 99, 99, 99 });
+            else
+                values.add(new byte[] { 98, 98, 98 });
+            Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()), values);
+            newDictInfo.setCardinality(dict.getSize());
+            dictionaryManager.trySaveNewDict(dict, newDictInfo);
+            ((TrieDictionary) dict).dump(System.out);
+
+            segment.putDictResPath(lfn, newDictInfo.getResourcePath());
+            segment.putDictResPath(lsi, sharedDict.getResourcePath());
+            segment.putDictResPath(ssc, sharedDict.getResourcePath());
+
+            // cubeManager.saveResource(segment.getCubeInstance());
+            // cubeManager.afterCubeUpdated(segment.getCubeInstance());
+
+            isFirstSegment = false;
+        }
+
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setToUpdateSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+        cube = cubeManager.updateCube(cubeBuilder);
+
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+        FileUtils.deleteDirectory(new File("../job/meta"));
+    }
+
+    @Test
+    public void test() throws IOException, ParseException {
+
+        //        String cubeName = "test_kylin_cube_without_slr_left_join_ready_2_segments";
+
+        CubeSegment newSeg = cubeManager.mergeSegments(cube, 0L, 1386835200000L, false);
+        //        String segmentName = newSeg.getName();
+
+        final Dictionary<?> dictionary = cubeManager.getDictionary(newSeg, lfn);
+        assertTrue(dictionary == null);
+        //        ((TrieDictionary) dictionary).dump(System.out);
+
+        // hack for distributed cache
+        //        File metaDir = new File("../job/meta");
+        //        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), metaDir);
+        //
+        //        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+        //        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+        //        // mapDriver.getConfiguration().set(KylinConfig.KYLIN_METADATA_URL,
+        //        // "../job/meta");
+        //
+        //        byte[] key = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 1 };
+        //        byte[] value = new byte[] { 1, 2, 3 };
+        //        byte[] newkey = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 2 };
+        //        byte[] newvalue = new byte[] { 1, 2, 3 };
+        //
+        //        mapDriver.withInput(new Text(key), new Text(value));
+        //        mapDriver.withOutput(new Text(newkey), new Text(newvalue));
+        //        mapDriver.setMapInputPath(new Path("/apps/hdmi-prod/b_kylin/prod/kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa/vac_sw_cube_v4/cuboid/15d_cuboid"));
+        //
+        //        mapDriver.runTest();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java
new file mode 100644
index 0000000..847071d
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.security.Credentials;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+/**
+ * @author yangli9
+ * 
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MockupMapContext {
+
+    public static Context create(final Configuration hconf, String metadataUrl, String cubeName, final Object[] outKV) {
+
+        hconf.set(BatchConstants.CFG_CUBE_NAME, cubeName);
+
+        return new WrappedMapper().getMapContext(new MapContext() {
+
+            @Override
+            public boolean nextKeyValue() throws IOException, InterruptedException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Object getCurrentKey() throws IOException, InterruptedException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Object getCurrentValue() throws IOException, InterruptedException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public void write(Object key, Object value) throws IOException, InterruptedException {
+                System.out.println("Write -- k:" + key + ", v:" + value);
+                if (outKV != null) {
+                    outKV[0] = key;
+                    outKV[1] = value;
+                }
+            }
+
+            @Override
+            public OutputCommitter getOutputCommitter() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public TaskAttemptID getTaskAttemptID() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public void setStatus(String msg) {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String getStatus() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public float getProgress() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Counter getCounter(Enum<?> counterName) {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Counter getCounter(String groupName, String counterName) {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Configuration getConfiguration() {
+                return hconf;
+            }
+
+            @Override
+            public Credentials getCredentials() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public JobID getJobID() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public int getNumReduceTasks() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Path getWorkingDirectory() throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<?> getOutputKeyClass() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<?> getOutputValueClass() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<?> getMapOutputKeyClass() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<?> getMapOutputValueClass() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String getJobName() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public RawComparator<?> getSortComparator() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String getJar() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public RawComparator<?> getGroupingComparator() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean getJobSetupCleanupNeeded() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean getTaskCleanupNeeded() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean getProfileEnabled() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String getProfileParams() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public IntegerRanges getProfileTaskRange(boolean isMap) {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String getUser() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean getSymlink() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Path[] getArchiveClassPaths() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public URI[] getCacheArchives() throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public URI[] getCacheFiles() throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Path[] getLocalCacheArchives() throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Path[] getLocalCacheFiles() throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Path[] getFileClassPaths() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String[] getArchiveTimestamps() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String[] getFileTimestamps() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public int getMaxMapAttempts() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public int getMaxReduceAttempts() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public void progress() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public InputSplit getInputSplit() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public RawComparator<?> getCombinerKeyGroupingComparator() {
+                throw new NotImplementedException();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
new file mode 100644
index 0000000..6ed9010
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+public class NDCuboidJobTest extends LocalFileMetadataTestCase {
+
+    private Configuration conf;
+
+    @Before
+    public void setup() throws Exception {
+        conf = new Configuration();
+        conf.set("fs.default.name", "file:///");
+        conf.set("mapred.job.tracker", "local");
+
+        // for local runner out-of-memory issue
+        conf.set("mapreduce.task.io.sort.mb", "10");
+
+        createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testJob6D() throws Exception {
+        String input = "src/test/resources/data/base_cuboid/";
+        String output = "target/test-output/6d_cuboid";
+        String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        String segmentName = "20130331080000_20131212080000";
+        String jobname = "6d_cuboid";
+        String level = "1";
+
+        FileUtil.fullyDelete(new File(output));
+
+        String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output, "-jobname", jobname, "-level", level };
+        assertEquals("Job failed", 0, ToolRunner.run(conf, new NDCuboidJob(), args));
+    }
+
+    @Test
+    public void testJob5D() throws Exception {
+        final String input = "src/test/resources/data/6d_cuboid/";
+        final String output = "target/test-output/5d_cuboid";
+        final String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        String segmentName = "20130331080000_20131212080000";
+        String jobname = "5d_cuboid";
+        String level = "2";
+
+        FileUtil.fullyDelete(new File(output));
+
+        String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output, "-jobname", jobname, "-level", level };
+        assertEquals("Job failed", 0, ToolRunner.run(conf, new NDCuboidJob(), args));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
new file mode 100644
index 0000000..9f50cc6
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.CuboidReducer;
+import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
+    MapReduceDriver<Text, Text, Text, Text, Text, Text> mapReduceDriver;
+    String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
+
+    @Before
+    public void setUp() throws Exception {
+        createTestMetadata();
+
+        // hack for distributed cache
+        FileUtils.deleteDirectory(new File("../job/meta"));
+        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("../job/meta"));
+
+        NDCuboidMapper mapper = new NDCuboidMapper();
+        CuboidReducer reducer = new CuboidReducer();
+        mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+        FileUtils.deleteDirectory(new File("../job/meta"));
+    }
+
+    @Test
+    public void testMapReduceWithSlr() throws IOException {
+
+        String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        String segmentName = "20130331080000_20131212080000";
+        mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+        mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+
+        byte[] key = { 0, 0, 0, 0, 0, 0, 1, -1, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 11, 54, -105, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+        byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1 };
+        Pair<Text, Text> input1 = new Pair<Text, Text>(new Text(key), new Text(value));
+
+        mapReduceDriver.addInput(input1);
+
+        List<Pair<Text, Text>> result = mapReduceDriver.run();
+
+        assertEquals(4, result.size());
+
+        byte[] resultKey = { 0, 0, 0, 0, 0, 0, 1, 127, 49, 48, 48, 48, 48, 48, 48, 48, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 55, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
+        byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1 };
+        Pair<Text, Text> output1 = new Pair<Text, Text>(new Text(resultKey), new Text(resultValue));
+
+        //As we will truncate decimal(KYLIN-766), value will no longer equals to resultValue
+        Collection<Text> keys = Collections2.transform(result, new Function<Pair<Text, Text>, Text>() {
+            @Nullable
+            @Override
+            public Text apply(Pair<Text, Text> input) {
+                return input.getFirst();
+            }
+        });
+        assertTrue(keys.contains(output1.getFirst()));
+        assertTrue(!result.contains(output1));
+
+        long[] keySet = new long[result.size()];
+
+        System.out.println(Bytes.toLong(new byte[] { 0, 0, 0, 0, 0, 0, 1, -1 }));
+        for (int i = 0; i < result.size(); i++) {
+            byte[] bytes = new byte[result.get(i).getFirst().getLength()];
+            System.arraycopy(result.get(i).getFirst().getBytes(), 0, bytes, 0, result.get(i).getFirst().getLength());
+            System.out.println(Bytes.toLong(bytes));
+            keySet[i] = Bytes.toLong(bytes);
+        }
+
+        // refer to CuboidSchedulerTest.testGetSpanningCuboid()
+        assertArrayEquals(new long[] { 383, 447, 503, 504 }, keySet);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index cbb0772..3042778 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -104,6 +104,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-job</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-storage-hbase</artifactId>
             <type>test-jar</type>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
deleted file mode 100644
index d24c99c..0000000
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class BuildEngineFactory {
-    
-    private static IBatchCubingEngine defaultBatchEngine;
-    
-    public static IBatchCubingEngine defaultBatchEngine() {
-        if (defaultBatchEngine == null) {
-            KylinConfig conf = KylinConfig.getInstanceFromEnv();
-            if (conf.isCubingInMem()) {
-                defaultBatchEngine = new MRBatchCubingEngine2();
-            } else {
-                defaultBatchEngine = new MRBatchCubingEngine();
-            }
-        }
-        return defaultBatchEngine;
-    }
-    
-    /** Build a new cube segment, typically its time range appends to the end of current cube. */
-    public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
-    }
-    
-    /** Merge multiple small segments into a big one. */
-    public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
-        return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
deleted file mode 100644
index 904f557..0000000
--- a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public interface IBatchCubingEngine {
-
-    /** Build a new cube segment, typically its time range appends to the end of current cube. */
-    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);
-    
-    /** Merge multiple small segments into a big one. */
-    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
-    
-    public Class<?> getSourceInterface();
-    
-    public Class<?> getStorageInterface();
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
deleted file mode 100644
index 0359ce9..0000000
--- a/job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.kylin.engine;
-
-import org.apache.kylin.cube.CubeSegment;
-
-public interface IStreamingCubingEngine {
-
-    public Runnable createStreamingCubingBuilder(CubeSegment seg);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
deleted file mode 100644
index a39ac74..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-
-public class BatchCubingJobBuilder extends JobBuilderSupport {
-
-    private final IMRBatchCubingInputSide inputSide;
-    private final IMRBatchCubingOutputSide outputSide;
-
-    public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
-        super(newSegment, submitter);
-        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
-        this.outputSide = MRUtil.getBatchCubingOutputSide(seg);
-    }
-
-    public CubingJob build() {
-        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
-        final String jobId = result.getId();
-        final String cuboidRootPath = getCuboidRootPath(jobId);
-
-        // Phase 1: Create Flat Table
-        inputSide.addStepPhase1_CreateFlatTable(result);
-
-        // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStep(jobId));
-        result.addTask(createBuildDictionaryStep(jobId));
-
-        // Phase 3: Build Cube
-        final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
-        final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
-        final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
-        // base cuboid step
-        result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
-        // n dim cuboid steps
-        for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
-            int dimNum = totalRowkeyColumnsCount - i;
-            result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
-        }
-        outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
-
-        // Phase 4: Update Metadata & Cleanup
-        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
-        inputSide.addStepPhase4_Cleanup(result);
-        outputSide.addStepPhase4_Cleanup(result);
-
-        return result;
-    }
-
-    private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
-        // base cuboid job
-        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-
-        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
-
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", ""); // marks flat table input
-        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "level", "0");
-
-        baseCuboidStep.setMapReduceParams(cmd.toString());
-        baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
-        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
-        return baseCuboidStep;
-    }
-
-    private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) {
-        // ND cuboid job
-        MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
-
-        ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
-        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getCubeInstance().getName() + "_Step");
-        appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
-
-        ndCuboidStep.setMapReduceParams(cmd.toString());
-        ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
-        return ndCuboidStep;
-    }
-
-    private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
-        String[] paths = new String[groupRowkeyColumnsCount + 1];
-        for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
-            int dimNum = totalRowkeyColumnCount - i;
-            if (dimNum == totalRowkeyColumnCount) {
-                paths[i] = cuboidRootPath + "base_cuboid";
-            } else {
-                paths[i] = cuboidRootPath + dimNum + "d_cuboid";
-            }
-        }
-        return paths;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
deleted file mode 100644
index b6f264e..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
-import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
-import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-
-public class BatchCubingJobBuilder2 extends JobBuilderSupport {
-
-    private final IMRBatchCubingInputSide inputSide;
-    private final IMRBatchCubingOutputSide2 outputSide;
-
-    public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
-        super(newSegment, submitter);
-        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
-        this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
-    }
-
-    public CubingJob build() {
-        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
-        final String jobId = result.getId();
-
-        // Phase 1: Create Flat Table
-        inputSide.addStepPhase1_CreateFlatTable(result);
-
-        // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStepWithStats(jobId));
-        result.addTask(createBuildDictionaryStep(jobId));
-        result.addTask(createSaveStatisticsStep(jobId));
-        outputSide.addStepPhase2_BuildDictionary(result);
-
-        // Phase 3: Build Cube
-        result.addTask(createInMemCubingStep(jobId));
-        outputSide.addStepPhase3_BuildCube(result);
-
-        // Phase 4: Update Metadata & Cleanup
-        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
-        inputSide.addStepPhase4_Cleanup(result);
-        outputSide.addStepPhase4_Cleanup(result);
-
-        return result;
-    }
-
-    private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
-        SaveStatisticsStep result = new SaveStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setStatisticsPath(getStatisticsPath(jobId));
-        return result;
-    }
-
-    private MapReduceExecutable createInMemCubingStep(String jobId) {
-        // base cuboid job
-        MapReduceExecutable cubeStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-
-        cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
-
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "jobflowid", jobId);
-
-        cubeStep.setMapReduceParams(cmd.toString());
-        cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
-        cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
-        return cubeStep;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
deleted file mode 100644
index 6264ebd..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.util.List;
-
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class BatchMergeJobBuilder extends JobBuilderSupport {
-
-    private final IMRBatchMergeOutputSide outputSide;
-
-    public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) {
-        super(mergeSegment, submitter);
-        this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
-    }
-
-    public CubingJob build() {
-        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
-        final String jobId = result.getId();
-        final String cuboidRootPath = getCuboidRootPath(jobId);
-
-        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
-        final List<String> mergingSegmentIds = Lists.newArrayList();
-        final List<String> mergingCuboidPaths = Lists.newArrayList();
-        for (CubeSegment merging : mergingSegments) {
-            mergingSegmentIds.add(merging.getUuid());
-            mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
-        }
-
-        // Phase 1: Merge Dictionary
-        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-
-        // Phase 2: Merge Cube Files
-        String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
-        result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
-        outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
-
-        // Phase 3: Update Metadata & Cleanup
-        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
-        outputSide.addStepPhase3_Cleanup(result);
-
-        return result;
-    }
-
-    private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
-        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
-        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", outputPath);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
-        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
-        return mergeCuboidDataStep;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
deleted file mode 100644
index e0fc438..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.steps.MergeCuboidFromStorageJob;
-import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-
-import java.util.List;
-
-public class BatchMergeJobBuilder2 extends JobBuilderSupport {
-
-    private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
-    
-    public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
-        super(mergeSegment, submitter);
-        this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
-    }
-
-    public CubingJob build() {
-        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
-        final String jobId = result.getId();
-
-        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
-        final List<String> mergingSegmentIds = Lists.newArrayList();
-        final List<String> mergingHTables = Lists.newArrayList();
-        for (CubeSegment merging : mergingSegments) {
-            mergingSegmentIds.add(merging.getUuid());
-            mergingHTables.add(merging.getStorageLocationIdentifier());
-        }
-
-        // Phase 1: Merge Dictionary
-        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-        result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
-        outputSide.addStepPhase1_MergeDictionary(result);
-
-        // Phase 2: Merge Cube
-        String formattedTables = StringUtil.join(mergingHTables, ",");
-        result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
-        outputSide.addStepPhase2_BuildCube(result);
-
-        // Phase 3: Update Metadata & Cleanup
-        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
-        outputSide.addStepPhase3_Cleanup(result);
-
-        return result;
-    }
-
-    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
-        MergeStatisticsStep result = new MergeStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setMergedStatisticsPath(mergedStatisticsFolder);
-        return result;
-    }
-
-    private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
-        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
-        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-        appendExecCmdParameters(cmd, "jobflowid", jobId);
-
-        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromStorageJob.class);
-        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-        return mergeCuboidDataStep;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java b/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
deleted file mode 100644
index 37a8841..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
+++ /dev/null
@@ -1,166 +0,0 @@
-package org.apache.kylin.engine.mr;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.kylin.common.util.Bytes;
-
-public class ByteArrayWritable implements WritableComparable<ByteArrayWritable> {
-
-    private byte[] data;
-    private int offset;
-    private int length;
-
-    public ByteArrayWritable() {
-        this(null, 0, 0);
-    }
-
-    public ByteArrayWritable(int capacity) {
-        this(new byte[capacity], 0, capacity);
-    }
-
-    public ByteArrayWritable(byte[] data) {
-        this(data, 0, data == null ? 0 : data.length);
-    }
-
-    public ByteArrayWritable(byte[] data, int offset, int length) {
-        this.data = data;
-        this.offset = offset;
-        this.length = length;
-    }
-
-    public byte[] array() {
-        return data;
-    }
-
-    public int offset() {
-        return offset;
-    }
-
-    public int length() {
-        return length;
-    }
-
-    public void set(byte[] array) {
-        set(array, 0, array.length);
-    }
-
-    public void set(byte[] array, int offset, int length) {
-        this.data = array;
-        this.offset = offset;
-        this.length = length;
-    }
-
-    public ByteBuffer asBuffer() {
-        if (data == null)
-            return null;
-        else if (offset == 0 && length == data.length)
-            return ByteBuffer.wrap(data);
-        else
-            return ByteBuffer.wrap(data, offset, length).slice();
-    }
-
-    @Override
-    public int hashCode() {
-        if (data == null)
-            return 0;
-        else
-            return Bytes.hashCode(data, offset, length);
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeInt(this.length);
-        out.write(this.data, this.offset, this.length);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        this.length = in.readInt();
-        this.data = new byte[this.length];
-        in.readFully(this.data, 0, this.length);
-        this.offset = 0;
-    }
-
-    // Below methods copied from BytesWritable
-    /**
-     * Define the sort order of the BytesWritable.
-     * @param that The other bytes writable
-     * @return Positive if left is bigger than right, 0 if they are equal, and
-     *         negative if left is smaller than right.
-     */
-    public int compareTo(ByteArrayWritable that) {
-        return WritableComparator.compareBytes(this.data, this.offset, this.length, that.data, that.offset, that.length);
-    }
-
-    /**
-     * Compares the bytes in this object to the specified byte array
-     * @param that
-     * @return Positive if left is bigger than right, 0 if they are equal, and
-     *         negative if left is smaller than right.
-     */
-    public int compareTo(final byte[] that) {
-        return WritableComparator.compareBytes(this.data, this.offset, this.length, that, 0, that.length);
-    }
-
-    /**
-     * @see java.lang.Object#equals(java.lang.Object)
-     */
-    @Override
-    public boolean equals(Object right_obj) {
-        if (right_obj instanceof byte[]) {
-            return compareTo((byte[]) right_obj) == 0;
-        }
-        if (right_obj instanceof ByteArrayWritable) {
-            return compareTo((ByteArrayWritable) right_obj) == 0;
-        }
-        return false;
-    }
-
-    /**
-     * @see java.lang.Object#toString()
-     */
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder(3 * this.length);
-        final int endIdx = this.offset + this.length;
-        for (int idx = this.offset; idx < endIdx; idx++) {
-            sb.append(' ');
-            String num = Integer.toHexString(0xff & this.data[idx]);
-            // if it is only one digit, add a leading 0.
-            if (num.length() < 2) {
-                sb.append('0');
-            }
-            sb.append(num);
-        }
-        return sb.length() > 0 ? sb.substring(1) : "";
-    }
-
-    /** A Comparator optimized for ByteArrayWritable.
-     */
-    public static class Comparator extends WritableComparator {
-        private BytesWritable.Comparator comparator = new BytesWritable.Comparator();
-
-        /** constructor */
-        public Comparator() {
-            super(ByteArrayWritable.class);
-        }
-
-        /**
-         * @see org.apache.hadoop.io.WritableComparator#compare(byte[], int, int, byte[], int, int)
-         */
-        @Override
-        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-            return comparator.compare(b1, s1, l1, b2, s2, l2);
-        }
-    }
-
-    static { // register this comparator
-        WritableComparator.define(ByteArrayWritable.class, new Comparator());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/job/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
deleted file mode 100644
index 77cbab7..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-
-/**
- */
-public class CubingJob extends DefaultChainedExecutable {
-
-    // KEYS of Output.extraInfo map, info passed across job steps
-    public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
-    public static final String SOURCE_SIZE_BYTES = "sourceSizeBytes";
-    public static final String CUBE_SIZE_BYTES = "byteSizeBytes";
-    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
-    
-    private static final String CUBE_INSTANCE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    
-    public static CubingJob createBuildJob(CubeSegment seg, String submitter, JobEngineConfig config) {
-        return initCubingJob(seg, "BUILD", submitter, config);
-    }
-    
-    public static CubingJob createMergeJob(CubeSegment seg, String submitter, JobEngineConfig config) {
-        return initCubingJob(seg, "MERGE", submitter, config);
-    }
-    
-    private static CubingJob initCubingJob(CubeSegment seg, String jobType, String submitter, JobEngineConfig config) {
-        CubingJob result = new CubingJob();
-        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
-        format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + jobType + " - " + format.format(new Date(System.currentTimeMillis())));
-        result.setSubmitter(submitter);
-        result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList());
-        return result;
-    }
-
-    public CubingJob() {
-        super();
-    }
-    
-    void setCubeName(String name) {
-        setParam(CUBE_INSTANCE_NAME, name);
-    }
-
-    public String getCubeName() {
-        return getParam(CUBE_INSTANCE_NAME);
-    }
-
-    void setSegmentId(String segmentId) {
-        setParam(SEGMENT_ID, segmentId);
-    }
-
-    public String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    @Override
-    protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) {
-        CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(getCubeName());
-        final Output output = jobService.getOutput(getId());
-        String logMsg;
-        state = output.getState();
-        if (state != ExecutableState.ERROR &&
-                !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString().toLowerCase())) {
-            logger.info("state:" + state + " no need to notify users");
-            return null;
-        }
-        switch (state) {
-            case ERROR:
-                logMsg = output.getVerboseMsg();
-                break;
-            case DISCARDED:
-                logMsg = "job has been discarded";
-                break;
-            case SUCCEED:
-                logMsg = "job has succeeded";
-                break;
-            default:
-                return null;
-        }
-        if (logMsg == null) {
-            logMsg = "no error message";
-        }
-        String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
-        content = content.replaceAll("\\$\\{job_name\\}", getName());
-        content = content.replaceAll("\\$\\{result\\}", state.toString());
-        content = content.replaceAll("\\$\\{cube_name\\}", getCubeName());
-        content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString());
-        content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins");
-        content = content.replaceAll("\\$\\{mr_waiting\\}", getMapReduceWaitTime() / 60000 + "mins");
-        content = content.replaceAll("\\$\\{last_update_time\\}", new Date(getLastModified()).toString());
-        content = content.replaceAll("\\$\\{submitter\\}", getSubmitter());
-        content = content.replaceAll("\\$\\{error_log\\}", logMsg);
-
-        try {
-            InetAddress inetAddress = InetAddress.getLocalHost();
-            content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
-        } catch (UnknownHostException e) {
-            logger.warn(e.getLocalizedMessage(), e);
-        }
-
-        String title = "["+ state.toString() + "] - [Kylin Cube Build Job]-" + getCubeName();
-        return Pair.of(title, content);
-    }
-
-    @Override
-    protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
-        long time = 0L;
-        for (AbstractExecutable task: getTasks()) {
-            final ExecutableState status = task.getStatus();
-            if (status != ExecutableState.SUCCEED) {
-                break;
-            }
-            if (task instanceof MapReduceExecutable) {
-                time += ((MapReduceExecutable) task).getMapReduceWaitTime();
-            }
-        }
-        setMapReduceWaitTime(time);
-        super.onExecuteFinished(result, executableContext);
-    }
-
-    public long getMapReduceWaitTime() {
-        return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
-    }
-
-    public void setMapReduceWaitTime(long t) {
-        addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
-    }
-    
-    public long findSourceRecordCount() {
-        return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0"));
-    }
-    
-    public long findSourceSizeBytes() {
-        return Long.parseLong(findExtraInfo(SOURCE_SIZE_BYTES, "0"));
-    }
-    
-    public long findCubeSizeBytes() {
-        return Long.parseLong(findExtraInfo(CUBE_SIZE_BYTES, "0"));
-    }
-    
-    private String findExtraInfo(String key, String dft) {
-        for (AbstractExecutable child : getTasks()) {
-            Output output = executableManager.getOutput(child.getId());
-            String value = output.getExtra().get(key);
-            if (value != null)
-                return value;
-        }
-        return dft;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
deleted file mode 100644
index 0c39398..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.TableDesc;
-
-/**
- * Any ITableSource that wishes to serve as input of MapReduce build engine must adapt to this interface.
- */
-public interface IMRInput {
-
-    /** Return a helper to participate in batch cubing job flow. */
-    public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
-
-    /** Return an InputFormat that reads from specified table. */
-    public IMRTableInputFormat getTableInputFormat(TableDesc table);
-    
-    /**
-     * Utility that configures mapper to read from a table.
-     */
-    public interface IMRTableInputFormat {
-        
-        /** Configure the InputFormat of given job. */
-        public void configureJob(Job job);
-        
-        /** Parse a mapper input object into column values. */
-        public String[] parseMapperInput(Object mapperInput);
-    }
-    
-    /**
-     * Participate the batch cubing flow as the input side. Responsible for creating
-     * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
-     * 
-     * - Phase 1: Create Flat Table
-     * - Phase 2: Build Dictionary (with FlatTableInputFormat)
-     * - Phase 3: Build Cube (with FlatTableInputFormat)
-     * - Phase 4: Update Metadata & Cleanup
-     */
-    public interface IMRBatchCubingInputSide {
-        
-        /** Return an InputFormat that reads from the intermediate flat table */
-        public IMRTableInputFormat getFlatTableInputFormat();
-        
-        /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
-        
-        /** Add step that does necessary clean up, like delete the intermediate flat table */
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
-    }
-}