You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:14 UTC

[06/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cube/RandomKeyDistributionReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RandomKeyDistributionReducerTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/RandomKeyDistributionReducerTest.java
deleted file mode 100644
index 8e85a27..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RandomKeyDistributionReducerTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.invertedindex.RandomKeyDistributionReducer;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author ysong1
- * 
- */
-public class RandomKeyDistributionReducerTest {
-    ReduceDriver<Text, LongWritable, Text, LongWritable> reduceDriver;
-
-    @Before
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public void setUp() {
-        RandomKeyDistributionReducer reducer = new RandomKeyDistributionReducer();
-        reduceDriver = ReduceDriver.newReduceDriver(reducer);
-    }
-
-    @Test
-    public void test() throws IOException {
-        List<Text> data = new ArrayList<Text>();
-        for (int i = 0; i < 1001; i++) {
-            data.add(new Text(String.valueOf(i)));
-        }
-        for (Text t : data) {
-            reduceDriver.addInput(t, new ArrayList<LongWritable>());
-        }
-
-        reduceDriver.getConfiguration().set(BatchConstants.REGION_NUMBER, "2");
-        List<Pair<Text, LongWritable>> result = reduceDriver.run();
-
-        assertEquals(2, result.size());
-
-        for (Pair<Text, LongWritable> p : result) {
-            System.out.println(p.getFirst());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJobTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJobTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJobTest.java
deleted file mode 100644
index b3bcc30..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJobTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-
-/**
- * @author ysong1
- * 
- */
-public class RangeKeyDistributionJobTest extends LocalFileMetadataTestCase {
-
-    private Configuration conf;
-
-    @Before
-    public void setup() throws Exception {
-        conf = new Configuration();
-        conf.set("fs.default.name", "file:///");
-        conf.set("mapred.job.tracker", "local");
-
-        // for local runner out-of-memory issue
-        conf.set("mapreduce.task.io.sort.mb", "10");
-        createTestMetadata();
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-
-    @Test
-    public void testJob() throws Exception {
-        String input = "src/test/resources/data/base_cuboid/,src/test/resources/data/6d_cuboid/";
-        String output = "target/test-output/key_distribution_range/";
-        String jobname = "calculate_splits";
-        String cubename = "test_kylin_cube_with_slr_ready";
-
-        FileUtil.fullyDelete(new File(output));
-
-        String[] args = { "-input", input, "-output", output, "-jobname", jobname, "-cubename", cubename };
-        assertEquals("Job failed", 0, ToolRunner.run(conf, new RangeKeyDistributionJob(), args));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapperTest.java
deleted file mode 100644
index 550fd6b..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapperTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author ysong1
- * 
- */
-public class RangeKeyDistributionMapperTest {
-
-    @SuppressWarnings("rawtypes")
-    MapDriver mapDriver;
-    String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
-
-    @Before
-    public void setUp() {
-        RangeKeyDistributionMapper mapper = new RangeKeyDistributionMapper();
-        mapDriver = MapDriver.newMapDriver(mapper);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testMapperWithoutHeader() throws IOException {
-
-        Text inputKey1 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey2 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 122, 1, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey3 = new Text(new byte[] { 2, 2, 2, 2, 2, 2, 2, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey4 = new Text(new byte[] { 3, 3, 3, 3, 3, 3, 3, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey5 = new Text(new byte[] { 4, 4, 4, 4, 4, 4, 4, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey6 = new Text(new byte[] { 5, 5, 5, 5, 5, 5, 5, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey7 = new Text(new byte[] { 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-
-        mapDriver.addInput(inputKey1, new Text("abc"));
-        mapDriver.addInput(inputKey2, new Text("abc"));
-        mapDriver.addInput(inputKey3, new Text("abc"));
-        mapDriver.addInput(inputKey4, new Text("abc"));
-        mapDriver.addInput(inputKey5, new Text("abc"));
-        mapDriver.addInput(inputKey6, new Text("abc"));
-        mapDriver.addInput(inputKey7, new Text("abc"));
-
-        List<Pair<Text, LongWritable>> result = mapDriver.run();
-
-        assertEquals(1, result.size());
-
-        byte[] key1 = result.get(0).getFirst().getBytes();
-        LongWritable value1 = result.get(0).getSecond();
-        assertArrayEquals(new byte[] { 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 }, key1);
-        assertEquals(147, value1.get());
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testMapperWithHeader() throws IOException {
-
-        Text inputKey1 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey2 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 0, 0, 0, 0, 0, 0, 0, 127, 11, 122, 1, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey3 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 2, 2, 2, 2, 2, 2, 2, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey4 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 3, 3, 3, 3, 3, 3, 3, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey5 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 4, 4, 4, 4, 4, 4, 4, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey6 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 5, 5, 5, 5, 5, 5, 5, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-        Text inputKey7 = new Text(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 });
-
-        mapDriver.addInput(inputKey1, new Text("abc"));
-        mapDriver.addInput(inputKey2, new Text("abc"));
-        mapDriver.addInput(inputKey3, new Text("abc"));
-        mapDriver.addInput(inputKey4, new Text("abc"));
-        mapDriver.addInput(inputKey5, new Text("abc"));
-        mapDriver.addInput(inputKey6, new Text("abc"));
-        mapDriver.addInput(inputKey7, new Text("abc"));
-
-        List<Pair<Text, LongWritable>> result = mapDriver.run();
-
-        assertEquals(1, result.size());
-
-        byte[] key1 = result.get(0).getFirst().getBytes();
-        LongWritable value1 = result.get(0).getSecond();
-        assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 0, 0, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7, 6, 6, 6, 6, 6, 6, 6, 127, 11, 56, -23, 0, 22, 98, 1, 0, 121, 7 }, key1);
-        assertEquals(273, value1.get());
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducerTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducerTest.java
deleted file mode 100644
index 38fb132..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducerTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author ysong1
- * 
- */
-public class RangeKeyDistributionReducerTest {
-
-    ReduceDriver<Text, LongWritable, Text, LongWritable> reduceDriver;
-    String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
-
-    @Before
-    public void setUp() {
-        RangeKeyDistributionReducer reducer = new RangeKeyDistributionReducer();
-        reduceDriver = ReduceDriver.newReduceDriver(reducer);
-    }
-
-    @Test
-    public void testReducer() throws IOException {
-        // TODO
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
deleted file mode 100644
index 386d858..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cubev2;
-
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-
-/**
- */
-public class CubeSamplingTest {
-
-    private static final int ROW_LENGTH = 10;
-
-    private final List<String> row = new ArrayList<String>(ROW_LENGTH);
-    private final ByteArray[] row_index = new ByteArray[ROW_LENGTH];
-
-    private Integer[][] allCuboidsBitSet;
-    private HashFunction hf = null;
-    private long baseCuboidId;
-    private HyperLogLogPlusCounter[] allCuboidsHLL = null;
-    private final byte[] seperator = Bytes.toBytes(",");
-
-    @Before
-    public void setup() {
-
-        baseCuboidId = (1l << ROW_LENGTH) - 1;
-        List<Long> allCuboids = Lists.newArrayList();
-        List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
-        for (long i = 1; i < baseCuboidId; i++) {
-            allCuboids.add(i);
-            addCuboidBitSet(i, allCuboidsBitSetList);
-        }
-
-        allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]);
-        System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids.");
-        allCuboidsHLL = new HyperLogLogPlusCounter[allCuboids.size()];
-        for (int i = 0; i < allCuboids.size(); i++) {
-            allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
-        }
-
-        //  hf = Hashing.goodFastHash(32);
-//        hf = Hashing.md5();
-        hf = Hashing.murmur3_32();
-
-        for (int i = 0; i < ROW_LENGTH; i++) {
-            row_index[i] = new ByteArray();
-        }
-    }
-
-    private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) {
-        BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
-        Integer[] indice = new Integer[bitSet.cardinality()];
-
-        long mask = Long.highestOneBit(baseCuboidId);
-        int position = 0;
-        for (int i = 0; i < ROW_LENGTH; i++) {
-            if ((mask & cuboidId) > 0) {
-                indice[position] = i;
-                position++;
-            }
-            mask = mask >> 1;
-        }
-
-        allCuboidsBitSet.add(indice);
-
-    }
-
-    @Test
-    public void test() {
-
-        long start = System.currentTimeMillis();
-        List<String> row;
-        for (int i = 0; i < 10000; i++) {
-            row = getRandomRow();
-            putRowKeyToHLL(row);
-        }
-
-        long duration = System.currentTimeMillis() - start;
-        System.out.println("The test takes " + duration / 1000 + "seconds.");
-    }
-
-    private void putRowKeyToHLL(List<String> row) {
-        int x = 0;
-        for (String field : row) {
-            Hasher hc = hf.newHasher();
-            row_index[x++].set(hc.putString(field).hash().asBytes());
-        }
-
-        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
-            Hasher hc = hf.newHasher();
-            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
-                hc.putBytes(row_index[allCuboidsBitSet[i][position]].array());
-                hc.putBytes(seperator);
-            }
-            allCuboidsHLL[i].add(hc.hash().asBytes());
-        }
-    }
-
-    private List<String> getRandomRow() {
-        row.clear();
-        for (int i = 0; i < ROW_LENGTH; i++) {
-            row.add(RandomStringUtils.random(10));
-        }
-        return row;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java
deleted file mode 100644
index b85afcc..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/FactDistinctColumnsReducerTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.kylin.job.hadoop.cubev2;
-
-import com.google.common.collect.Maps;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- */
-public class FactDistinctColumnsReducerTest {
-
-
-    @Test
-    public void testWriteCuboidStatistics() throws IOException {
-
-        final Configuration conf = HadoopUtil.getCurrentConfiguration();
-        final Path outputPath = new Path("file:///tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString());
-        if (!FileSystem.getLocal(conf).exists(outputPath)) {
-//            FileSystem.getLocal(conf).create(outputPath);
-        }
-
-        System.out.println(outputPath);
-        Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
-        FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100);
-
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java
deleted file mode 100644
index 5f43c2a..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hbase;
-
-import static org.junit.Assert.*;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public class CreateHTableTest extends LocalFileMetadataTestCase {
-
-    private Configuration conf;
-
-    @Before
-    public void setup() throws Exception {
-        conf = new Configuration();
-        conf.set("fs.default.name", "file:///");
-        conf.set("mapred.job.tracker", "local");
-        this.createTestMetadata();
-
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    @Test
-    public void testGetSplits() throws IllegalArgumentException, Exception {
-        CreateHTableJob c = new CreateHTableJob();
-
-        String input = "src/test/resources/partition_list/part-r-00000";
-
-        byte[][] splits = c.getSplits(conf, new Path(input));
-
-        assertEquals(497, splits.length);
-        assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 0, 15, -1, 11, 51, -45, 2 }, splits[0]);
-        assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 3, -1, -1, -54, -61, 109, -44, 1 }, splits[496]);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
deleted file mode 100644
index 3232a80..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.kylin.common.util.Bytes;
-
-/**
- */
-public class TestHbaseClient {
-
-    private static boolean reverse = false;
-
-    public static void foo(int n, int k) {
-        int t = k;
-        if (n - k < k) {
-            t = n - k;
-            reverse = true;
-        }
-        boolean[] flags = new boolean[n];
-        inner(flags, 0, t);
-    }
-
-    private static void print(boolean[] flags) {
-        for (int i = 0; i < flags.length; i++) {
-            if (!reverse) {
-                if (flags[i])
-                    System.out.print("0");
-                else
-                    System.out.print("1");
-            } else {
-                if (flags[i])
-                    System.out.print("1");
-                else
-                    System.out.print("0");
-
-            }
-        }
-        System.out.println();
-
-    }
-
-    private static void inner(boolean[] flags, int start, int remaining) {
-        if (remaining <= 0) {
-            print(flags);
-            return;
-        }
-
-        if (flags.length - start < remaining) {
-            return;
-        }
-
-        // write at flags[start]
-        flags[start] = true;
-        inner(flags, start + 1, remaining - 1);
-
-        // not write at flags[start]
-        flags[start] = false;
-        inner(flags, start + 1, remaining);
-    }
-
-    public static void main(String[] args) throws IOException {
-        foo(6, 5);
-        foo(5, 2);
-        foo(3, 0);
-
-        Configuration conf = HBaseConfiguration.create();
-        conf.set("hbase.zookeeper.quorum", "hbase_host");
-        conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-
-        HTable table = new HTable(conf, "test1");
-        Put put = new Put(Bytes.toBytes("row1"));
-
-        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
-        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
-
-        table.put(put);
-        table.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java
deleted file mode 100644
index 69519e0..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hdfs/ITHdfsOpsTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-/**
- */
-public class ITHdfsOpsTest extends HBaseMetadataTestCase {
-
-    FileSystem fileSystem;
-
-    @Before
-    public void setup() throws Exception {
-
-        this.createTestMetadata();
-
-        Configuration hconf = new Configuration();
-
-        fileSystem = FileSystem.get(hconf);
-    }
-
-    @Test
-    public void TestPath() throws IOException {
-        String hdfsWorkingDirectory = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
-        Path coprocessorDir = new Path(hdfsWorkingDirectory, "test");
-        fileSystem.mkdirs(coprocessorDir);
-
-        Path newFile = new Path(coprocessorDir, "test_file");
-        newFile = newFile.makeQualified(fileSystem.getUri(), null);
-        FSDataOutputStream stream = fileSystem.create(newFile);
-        stream.write(new byte[] { 0, 1, 2 });
-        stream.close();
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
deleted file mode 100644
index 21f6a71..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hive;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-@Ignore("This test case doesn't have much value, ignore it.")
-public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
-
-    CubeInstance cube = null;
-    CubeJoinedFlatTableDesc intermediateTableDesc = null;
-    String fakeJobUUID = "abc-def";
-    CubeSegment cubeSegment = null;
-
-    @Before
-    public void setUp() throws Exception {
-        this.createTestMetadata();
-        cube = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready");
-        cubeSegment = cube.getSegments().get(0);
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    @Test
-    public void testGenCreateTableDDL() {
-        String ddl = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, "/tmp");
-        System.out.println(ddl);
-
-        System.out.println("The length for the ddl is " + ddl.length());
-    }
-
-    @Test
-    public void testGenDropTableDDL() {
-        String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
-        System.out.println(ddl);
-        assertEquals(107, ddl.length());
-    }
-
-    @Test
-    public void testGenerateInsertSql() throws IOException {
-        String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
-        System.out.println(sqls);
-
-        int length = sqls.length();
-        assertEquals(1155, length);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 20208f2..1a503e1 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -26,7 +26,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
-import org.apache.kylin.storage.hbase.ZookeeperJobLock;
+import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
deleted file mode 100644
index 2fbcd94..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.kylin.storage.gridtable.GTBuilder;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.GridTable;
-import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.apache.kylin.storage.gridtable.UnitTestSupport;
-import org.junit.Test;
-
-public class ConcurrentDiskStoreTest {
-
-    final GTInfo info = UnitTestSupport.advancedInfo();
-    final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
-
-    @Test
-    public void testSingleThreadRead() throws IOException, InterruptedException {
-        long start = System.currentTimeMillis();
-        verifyOneTableWriteAndRead(1);
-        long end = System.currentTimeMillis();
-        System.out.println("Cost " + (end - start) + " millis");
-    }
-
-    @Test
-    public void testMultiThreadRead() throws IOException, InterruptedException {
-        long start = System.currentTimeMillis();
-        verifyOneTableWriteAndRead(20);
-        long end = System.currentTimeMillis();
-        System.out.println("Cost " + (end - start) + " millis");
-    }
-    
-    private void verifyOneTableWriteAndRead(int readThreads) throws IOException, InterruptedException {
-        ConcurrentDiskStore store = new ConcurrentDiskStore(info);
-        GridTable table = new GridTable(info, store);
-        verifyWriteAndRead(table, readThreads);
-    }
-
-    private void verifyWriteAndRead(final GridTable table, int readThreads) throws IOException, InterruptedException {
-        GTBuilder builder = table.rebuild();
-        for (GTRecord r : data) {
-            builder.write(r);
-        }
-        builder.close();
-
-        int nThreads = readThreads;
-        Thread[] t = new Thread[nThreads];
-        for (int i = 0; i < nThreads; i++) {
-            t[i] = new Thread() {
-                public void run() {
-                    try {
-                        IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo()));
-                        int i = 0;
-                        for (GTRecord r : scanner) {
-                            assertEquals(data.get(i++), r);
-                        }
-                        scanner.close();
-                    } catch (Exception ex) {
-                        ex.printStackTrace();
-                    }
-                }
-            };
-            t[i].start();
-        }
-        for (int i = 0; i < nThreads; i++) {
-            t[i].join();
-        }
-        
-        ((ConcurrentDiskStore) table.getStore()).close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
deleted file mode 100644
index d5563b7..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderStressTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class DoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
-
-    @SuppressWarnings("unused")
-    private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderStressTest.class);
-
-    // CI sandbox memory is no more than 512MB, this many input should hit memory threshold
-    private static final int INPUT_ROWS = 200000;
-    private static final int THREADS = 4;
-
-    private static CubeInstance cube;
-    private static String flatTable;
-    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
-
-    @BeforeClass
-    public static void before() throws IOException {
-        staticCreateTestMetadata();
-
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-
-        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
-        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
-        dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
-    }
-
-    @AfterClass
-    public static void after() throws Exception {
-        staticCleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws Exception {
-
-        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        long randSeed = System.currentTimeMillis();
-
-        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        doggedBuilder.setConcurrentThreads(THREADS);
-
-        {
-            Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, new NoopWriter()));
-            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
-            future.get();
-        }
-    }
-
-    class NoopWriter implements ICuboidWriter {
-        @Override
-        public void write(long cuboidId, GTRecord record) throws IOException {
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
deleted file mode 100644
index a87f950..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilderTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import static org.junit.Assert.*;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
-
-    @SuppressWarnings("unused")
-    private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderTest.class);
-
-    private static final int INPUT_ROWS = 10000;
-    private static final int SPLIT_ROWS = 5000;
-    private static final int THREADS = 4;
-
-    private static CubeInstance cube;
-    private static String flatTable;
-    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
-
-    @BeforeClass
-    public static void before() throws IOException {
-        staticCreateTestMetadata();
-
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-
-        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
-        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
-        dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
-    }
-
-    @AfterClass
-    public static void after() throws Exception {
-        staticCleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws Exception {
-
-        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        long randSeed = System.currentTimeMillis();
-
-        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        doggedBuilder.setConcurrentThreads(THREADS);
-        doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
-        FileRecordWriter doggedResult = new FileRecordWriter();
-
-        {
-            Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
-            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
-            future.get();
-            doggedResult.close();
-        }
-
-        InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        inmemBuilder.setConcurrentThreads(THREADS);
-        FileRecordWriter inmemResult = new FileRecordWriter();
-
-        {
-            Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
-            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
-            future.get();
-            inmemResult.close();
-        }
-
-        fileCompare(doggedResult.file, inmemResult.file);
-        doggedResult.file.delete();
-        inmemResult.file.delete();
-    }
-
-    private void fileCompare(File file, File file2) throws IOException {
-        BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
-        BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8"));
-
-        String line1, line2;
-        do {
-            line1 = r1.readLine();
-            line2 = r2.readLine();
-            
-            assertEquals(line1, line2);
-            
-        } while (line1 != null || line2 != null);
-
-        r1.close();
-        r2.close();
-    }
-
-    class FileRecordWriter implements ICuboidWriter {
-
-        File file;
-        PrintWriter writer;
-
-        FileRecordWriter() throws IOException {
-            file = File.createTempFile("DoggedCubeBuilderTest_", ".data");
-            writer = new PrintWriter(file, "UTF-8");
-        }
-
-        @Override
-        public void write(long cuboidId, GTRecord record) throws IOException {
-            writer.print(cuboidId);
-            writer.print(", ");
-            writer.print(record.toString());
-            writer.println();
-        }
-
-        public void close() {
-            writer.close();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
deleted file mode 100644
index 3caa1b0..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.engine.mr.DFSFileTableReader;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class);
-
-    private static final int INPUT_ROWS = 70000;
-    private static final int THREADS = 4;
-    
-    private static CubeInstance cube;
-    private static String flatTable;
-    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
-    
-    @BeforeClass
-    public static void before() throws IOException {
-        staticCreateTestMetadata();
-        
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-        
-        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
-        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
-        dictionaryMap = getDictionaryMap(cube, flatTable);
-    }
-
-    @AfterClass
-    public static void after() throws Exception {
-        staticCleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws Exception {
-
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        cubeBuilder.setConcurrentThreads(THREADS);
-        
-        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-
-        try {
-            // round 1
-            {
-                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
-                feedData(cube, flatTable, queue, INPUT_ROWS);
-                future.get();
-            }
-            
-            // round 2, zero input
-            {
-                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
-                feedData(cube, flatTable, queue, 0);
-                future.get();
-            }
-            
-            // round 3
-            {
-                Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
-                feedData(cube, flatTable, queue, INPUT_ROWS);
-                future.get();
-            }
-            
-        } catch (Exception e) {
-            logger.error("stream build failed", e);
-            throw new IOException("Failed to build cube ", e);
-        }
-    }
-
-    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
-        feedData(cube, flatTable, queue, count, 0);
-    }
-    
-    static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
-        int nColumns = flatTableDesc.getColumnList().size();
-
-        @SuppressWarnings("unchecked")
-        Set<String>[] distinctSets = new Set[nColumns];
-        for (int i = 0; i < nColumns; i++)
-            distinctSets[i] = new TreeSet<String>();
-
-        // get distinct values on each column
-        DFSFileTableReader reader = new DFSFileTableReader(flatTable, nColumns);
-        while (count > 0 && reader.next()) {
-            String[] row = reader.getRow();
-            for (int i = 0; i < nColumns; i++)
-                distinctSets[i].add(row[i]);
-        }
-        reader.close();
-
-        List<String[]> distincts = new ArrayList<String[]>();
-        for (int i = 0; i < nColumns; i++) {
-            distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
-        }
-
-        Random rand = new Random();
-        if (randSeed != 0)
-            rand.setSeed(randSeed);
-        
-        // output with random data
-        for (; count > 0; count--) {
-            ArrayList<String> row = new ArrayList<String>(nColumns);
-            for (int i = 0; i < nColumns; i++) {
-                String[] candidates = distincts.get(i);
-                row.add(candidates[rand.nextInt(candidates.length)]);
-            }
-            queue.put(row);
-        }
-        queue.put(new ArrayList<String>(0));
-    }
-
-    static Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
-        Map<TblColRef, Dictionary<?>> result = Maps.newHashMap();
-        CubeDesc desc = cube.getDescriptor();
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
-        int nColumns = flatTableDesc.getColumnList().size();
-
-        List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
-        for (int c = 0; c < columns.size(); c++) {
-            TblColRef col = columns.get(c);
-            if (desc.getRowkey().isUseDictionary(col)) {
-                logger.info("Building dictionary for " + col);
-                List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
-                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), valueList);
-                result.put(col, dict);
-            }
-        }
-        return result;
-    }
-
-    private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
-        List<byte[]> result = Lists.newArrayList();
-        DFSFileTableReader reader = new DFSFileTableReader(flatTable, nColumns);
-        while (reader.next()) {
-            String[] row = reader.getRow();
-            if (row[c] != null) {
-                result.add(Bytes.toBytes(row[c]));
-            }
-        }
-        reader.close();
-        return result;
-    }
-
-    class ConsoleGTRecordWriter implements ICuboidWriter {
-
-        boolean verbose = false;
-
-        @Override
-        public void write(long cuboidId, GTRecord record) throws IOException {
-            if (verbose)
-                System.out.println(record.toString());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
deleted file mode 100644
index fd848f2..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.storage.gridtable.GTBuilder;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.GridTable;
-import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.apache.kylin.storage.gridtable.UnitTestSupport;
-import org.junit.Test;
-
-public class MemDiskStoreTest {
-
-    final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
-    final GTInfo info = UnitTestSupport.advancedInfo();
-    final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
-
-    @Test
-    public void testSingleThreadWriteRead() throws IOException {
-        long start = System.currentTimeMillis();
-        verifyOneTableWriteAndRead();
-        long end = System.currentTimeMillis();
-        System.out.println("Cost " + (end - start) + " millis");
-    }
-
-    @Test
-    public void testMultiThreadWriteRead() throws IOException, InterruptedException {
-        long start = System.currentTimeMillis();
-
-        int nThreads = 5;
-        Thread[] t = new Thread[nThreads];
-        for (int i = 0; i < nThreads; i++) {
-            t[i] = new Thread() {
-                public void run() {
-                    try {
-                        verifyOneTableWriteAndRead();
-                    } catch (Exception ex) {
-                        ex.printStackTrace();
-                    }
-                }
-            };
-            t[i].start();
-        }
-        for (int i = 0; i < nThreads; i++) {
-            t[i].join();
-        }
-
-        long end = System.currentTimeMillis();
-        System.out.println("Cost " + (end - start) + " millis");
-    }
-
-    private void verifyOneTableWriteAndRead() throws IOException {
-        MemDiskStore store = new MemDiskStore(info, budgetCtrl);
-        GridTable table = new GridTable(info, store);
-        verifyWriteAndRead(table);
-    }
-
-    private void verifyWriteAndRead(GridTable table) throws IOException {
-        GTInfo info = table.getInfo();
-
-        GTBuilder builder = table.rebuild();
-        for (GTRecord r : data) {
-            builder.write(r);
-        }
-        builder.close();
-
-        IGTScanner scanner = table.scan(new GTScanRequest(info));
-        int i = 0;
-        for (GTRecord r : scanner) {
-            assertEquals(data.get(i++), r);
-        }
-        scanner.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
deleted file mode 100644
index 6f86c95..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-import static org.junit.Assert.*;
-
-public class MemoryBudgetControllerTest {
-
-    @Test
-    public void test() {
-        final int n = MemoryBudgetController.getSystemAvailMB() / 2;
-        final MemoryBudgetController mbc = new MemoryBudgetController(n);
-
-        ArrayList<Consumer> mbList = new ArrayList<Consumer>();
-        for (int i = 0; i < n; i++) {
-            mbList.add(new Consumer(mbc));
-            assertEquals(mbList.size(), mbc.getTotalReservedMB());
-        }
-
-        // a's reservation will free up all the previous
-        final Consumer a = new Consumer();
-        mbc.reserve(a, n);
-        for (int i = 0; i < n; i++) {
-            assertEquals(null, mbList.get(i).data);
-        }
-        
-        // cancel a in 2 seconds
-        new Thread() {
-            @Override
-            public void run() {
-                try {
-                    Thread.sleep(2000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-                mbc.reserve(a, 0);
-            }
-        }.start();
-        
-        // b will success after some wait
-        long bWaitStart = System.currentTimeMillis();
-        final Consumer b = new Consumer();
-        mbc.reserveInsist(b, n);
-        assertTrue(System.currentTimeMillis() - bWaitStart > 1000);
-
-        try {
-            mbc.reserve(a, 1);
-            fail();
-        } catch (NotEnoughBudgetException ex) {
-            // expected
-        }
-    }
-
-    class Consumer implements MemoryBudgetController.MemoryConsumer {
-
-        byte[] data;
-
-        Consumer() {
-        }
-
-        Consumer(MemoryBudgetController mbc) {
-            mbc.reserve(this, 1);
-            data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data 
-        }
-
-        @Override
-        public int freeUp(int mb) {
-            if (data != null) {
-                data = null;
-                return 1;
-            } else {
-                return 0;
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/test/java/org/apache/kylin/job/manager/ExecutableManagerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/manager/ExecutableManagerTest.java b/job/src/test/java/org/apache/kylin/job/manager/ExecutableManagerTest.java
deleted file mode 100644
index 1d28fc8..0000000
--- a/job/src/test/java/org/apache/kylin/job/manager/ExecutableManagerTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.manager;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.job.BaseTestExecutable;
-import org.apache.kylin.job.SucceedTestExecutable;
-import org.apache.kylin.job.exception.IllegalStateTranferException;
-import org.apache.kylin.job.execution.ChainedExecutable;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- */
-public class ExecutableManagerTest extends LocalFileMetadataTestCase {
-
-    private ExecutableManager service;
-
-    @Before
-    public void setup() throws Exception {
-        createTestMetadata();
-        service = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-        for (String jobId: service.getAllJobIds()) {
-            System.out.println("deleting " + jobId);
-            service.deleteJob(jobId);
-        }
-
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws Exception {
-        assertNotNull(service);
-        BaseTestExecutable executable = new SucceedTestExecutable();
-        executable.setParam("test1", "test1");
-        executable.setParam("test2", "test2");
-        executable.setParam("test3", "test3");
-        service.addJob(executable);
-        List<AbstractExecutable> result = service.getAllExecutables();
-        assertEquals(1, result.size());
-        AbstractExecutable another = service.getJob(executable.getId());
-        assertJobEqual(executable, another);
-
-        service.updateJobOutput(executable.getId(), ExecutableState.RUNNING, null, "test output");
-        assertJobEqual(executable, service.getJob(executable.getId()));
-    }
-
-    @Test
-    public void testDefaultChainedExecutable() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        job.addTask(new SucceedTestExecutable());
-        job.addTask(new SucceedTestExecutable());
-
-        service.addJob(job);
-        assertEquals(2, job.getTasks().size());
-        AbstractExecutable anotherJob = service.getJob(job.getId());
-        assertEquals(DefaultChainedExecutable.class, anotherJob.getClass());
-        assertEquals(2, ((DefaultChainedExecutable) anotherJob).getTasks().size());
-        assertJobEqual(job, anotherJob);
-    }
-
-    @Test
-    public void testValidStateTransfer() throws Exception {
-        SucceedTestExecutable job = new SucceedTestExecutable();
-        String id = job.getId();
-        service.addJob(job);
-        service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
-        service.updateJobOutput(id, ExecutableState.ERROR, null, null);
-        service.updateJobOutput(id, ExecutableState.READY, null, null);
-        service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
-        service.updateJobOutput(id, ExecutableState.READY, null, null);
-        service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
-        service.updateJobOutput(id, ExecutableState.SUCCEED, null, null);
-    }
-
-    @Test(expected = IllegalStateTranferException.class)
-    public void testInvalidStateTransfer(){
-        SucceedTestExecutable job = new SucceedTestExecutable();
-        service.addJob(job);
-        service.updateJobOutput(job.getId(), ExecutableState.RUNNING, null, null);
-        service.updateJobOutput(job.getId(), ExecutableState.STOPPED, null, null);
-    }
-
-
-
-    private static void assertJobEqual(Executable one, Executable another) {
-        assertEquals(one.getClass(), another.getClass());
-        assertEquals(one.getId(), another.getId());
-        assertEquals(one.getStatus(), another.getStatus());
-        assertEquals(one.isRunnable(), another.isRunnable());
-        assertEquals(one.getOutput(), another.getOutput());
-        assertTrue((one.getParams() == null && another.getParams() == null) || (one.getParams() != null && another.getParams() != null));
-        if (one.getParams() != null) {
-            assertEquals(one.getParams().size(), another.getParams().size());
-            for (String key : one.getParams().keySet()) {
-                assertEquals(one.getParams().get(key), another.getParams().get(key));
-            }
-        }
-        if (one instanceof ChainedExecutable) {
-            assertTrue(another instanceof ChainedExecutable);
-            List<? extends Executable> onesSubs = ((ChainedExecutable) one).getTasks();
-            List<? extends Executable> anotherSubs = ((ChainedExecutable) another).getTasks();
-            assertTrue((onesSubs == null && anotherSubs == null) || (onesSubs != null && anotherSubs != null));
-            if (onesSubs != null) {
-                assertEquals(onesSubs.size(), anotherSubs.size());
-                for (int i = 0; i < onesSubs.size(); ++i) {
-                    assertJobEqual(onesSubs.get(i), anotherSubs.get(i));
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 1e55b2c..325cbee 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -42,7 +42,7 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.BuildEngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -65,7 +65,7 @@ import org.apache.kylin.rest.response.MetricsResponse;
 import org.apache.kylin.rest.security.AclPermission;
 import org.apache.kylin.source.hive.HiveSourceTableLoader;
 import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.HBaseRegionSizeCalculator;
+import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 96b615f..bb17bd2 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -31,9 +31,9 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.BuildEngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/source-hive/pom.xml
----------------------------------------------------------------------
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index 17a918e..8b4a973 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -48,6 +48,11 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hive.hcatalog</groupId>
             <artifactId>hive-hcatalog-core</artifactId>
             <version>${hive-hcatalog.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
new file mode 100644
index 0000000..c40924e
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+
+public class HiveMRInput implements IMRInput {
+
+    @Override
+    public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+        return new BatchCubingInputSide(seg);
+    }
+    
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table) {
+        return new HiveTableInputFormat(table.getIdentity());
+    }
+    
+    public static class HiveTableInputFormat implements IMRTableInputFormat {
+        final String dbName;
+        final String tableName;
+
+        public HiveTableInputFormat(String hiveTable) {
+            String[] parts = HadoopUtil.parseHiveTableName(hiveTable);
+            dbName = parts[0];
+            tableName = parts[1];
+        }
+
+        @Override
+        public void configureJob(Job job) {
+            try {
+                HCatInputFormat.setInput(job, dbName, tableName);
+                job.setInputFormatClass(HCatInputFormat.class);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public String[] parseMapperInput(Object mapperInput) {
+            return HiveTableReader.getRowAsStringArray((HCatRecord) mapperInput);
+        }
+        
+    }
+
+    public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
+        
+        final JobEngineConfig conf;
+        final CubeSegment seg;
+        final CubeJoinedFlatTableDesc flatHiveTableDesc;
+
+        public BatchCubingInputSide(CubeSegment seg) {
+            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            this.seg = seg;
+            this.flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+            jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId()));
+        }
+        
+        public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) {
+
+            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+            String insertDataHqls;
+            try {
+                insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to generate insert data SQL for intermediate table.", e);
+            }
+
+            ShellExecutable step = new ShellExecutable();
+            StringBuffer buf = new StringBuffer();
+            buf.append("hive -e \"");
+            buf.append(dropTableHql + "\n");
+            buf.append(createTableHql + "\n");
+            buf.append(insertDataHqls + "\n");
+            buf.append("\"");
+
+            step.setCmd(buf.toString());
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+
+            return step;
+        }
+
+        @Override
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            GarbageCollectionStep step = new GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+            step.setOldHiveTable(flatHiveTableDesc.getTableName());
+            jobFlow.addTask(step);
+        }
+
+        @Override
+        public IMRTableInputFormat getFlatTableInputFormat() {
+            return new HiveTableInputFormat(flatHiveTableDesc.getTableName());
+        }
+        
+    }
+    
+    public static class GarbageCollectionStep extends AbstractExecutable {
+
+        @Override
+        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+            StringBuffer output = new StringBuffer();
+
+            final String hiveTable = this.getOldHiveTable();
+            if (StringUtils.isNotEmpty(hiveTable)) {
+                final String dropHiveCMD = "hive -e \"DROP TABLE IF EXISTS  " + hiveTable + ";\"";
+                ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
+                try {
+                    context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
+                    output.append("Hive table " + hiveTable + " is dropped. \n");
+                } catch (IOException e) {
+                    logger.error("job:" + getId() + " execute finished with exception", e);
+                    output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage());
+                    return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+                }
+            }
+
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+        }
+
+        public void setOldHiveTable(String hiveTable) {
+            setParam("oldHiveTable", hiveTable);
+        }
+
+        private String getOldHiveTable() {
+            return getParam("oldHiveTable");
+        }
+    }
+
+}