You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/26 17:21:12 UTC

[26/32] incubator-kylin git commit: KYLIN-693 character all build cube/ii test cases

KYLIN-693 character all build cube/ii test cases


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/29058abc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/29058abc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/29058abc

Branch: refs/heads/0.8.0
Commit: 29058abc932806769bba4d2cbbfcd3561515934b
Parents: 3a0d9d2
Author: honma <ho...@ebay.com>
Authored: Mon May 25 14:15:39 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue May 26 23:21:27 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/BuildCubeWithEngineTest.java      | 279 +++++++++++++++++++
 .../kylin/job/BuildCubeWithStreamTest.java      | 224 +++++++++++++++
 .../apache/kylin/job/BuildIIWithEngineTest.java | 254 +++++++++++++++++
 .../apache/kylin/job/BuildIIWithStreamTest.java | 270 ++++++++++++++++++
 .../kylin/job/ITBuildCubeWithEngineTest.java    | 279 -------------------
 .../kylin/job/ITBuildCubeWithStreamTest.java    | 224 ---------------
 .../kylin/job/ITBuildIIWithEngineTest.java      | 254 -----------------
 .../kylin/job/ITBuildIIWithStreamTest.java      | 270 ------------------
 8 files changed, 1027 insertions(+), 1027 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29058abc/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
new file mode 100644
index 0000000..547a6e7
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.cube.CubingJob;
+import org.apache.kylin.job.cube.CubingJobBuilder;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.assertEquals;
+
+public class BuildCubeWithEngineTest {
+
+    private JobEngineConfig jobEngineConfig;
+
+    private CubeManager cubeManager;
+
+    private DefaultScheduler scheduler;
+
+    protected ExecutableManager jobService;
+
+    private static final Log logger = LogFactory.getLog(BuildCubeWithEngineTest.class);
+
+    protected void waitForJob(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        jobService = ExecutableManager.getInstance(kylinConfig);
+        scheduler = DefaultScheduler.getInstance();
+        scheduler.init(new JobEngineConfig(kylinConfig));
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        cubeManager = CubeManager.getInstance(kylinConfig);
+        jobEngineConfig = new JobEngineConfig(kylinConfig);
+        for (String jobId : jobService.getAllJobIds()) {
+            if(jobService.getJob(jobId) instanceof CubingJob){
+                jobService.deleteJob(jobId);
+            }
+        }
+
+    }
+
+    @After
+    public void after() {
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+        DeployUtil.prepareTestData("left", "test_kylin_cube_with_slr_left_join_empty");
+        testInner();
+        testLeft();
+    }
+
+    private void testInner() throws Exception {
+        String[] testCase = new String[]{
+                "testInnerJoinCube",
+                "testInnerJoinCube2",
+        };
+        runTestAndAssertSucceed(testCase);
+    }
+
+    private void testLeft() throws Exception {
+        String[] testCase = new String[]{
+                "testLeftJoinCube",
+                "testLeftJoinCube2",
+        };
+        runTestAndAssertSucceed(testCase);
+    }
+
+    private void runTestAndAssertSucceed(String[] testCase) throws Exception {
+        ExecutorService executorService = Executors.newFixedThreadPool(testCase.length);
+        final CountDownLatch countDownLatch = new CountDownLatch(testCase.length);
+        List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length);
+        for (int i = 0; i < testCase.length; i++) {
+            tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch)));
+        }
+        countDownLatch.await();
+        try {
+            for (int i = 0; i < tasks.size(); ++i) {
+                Future<List<String>> task = tasks.get(i);
+                final List<String> jobIds = task.get();
+                for (String jobId : jobIds) {
+                    assertJobSucceed(jobId);
+                }
+            }
+        } catch (Exception ex) {
+            logger.error(ex);
+            throw ex;
+        }
+    }
+
+    private void assertJobSucceed(String jobId) {
+        assertEquals("The job '" + jobId + "' is failed.", ExecutableState.SUCCEED, jobService.getOutput(jobId).getState());
+    }
+
+    private class TestCallable implements Callable<List<String>> {
+
+        private final String methodName;
+        private final CountDownLatch countDownLatch;
+
+        public TestCallable(String methodName, CountDownLatch countDownLatch) {
+            this.methodName = methodName;
+            this.countDownLatch = countDownLatch;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public List<String> call() throws Exception {
+            try {
+                final Method method = BuildCubeWithEngineTest.class.getDeclaredMethod(methodName);
+                method.setAccessible(true);
+                return (List<String>) method.invoke(BuildCubeWithEngineTest.this);
+            } finally {
+                countDownLatch.countDown();
+            }
+        }
+    }
+
+    @SuppressWarnings("unused") // called by reflection
+    private List<String> testInnerJoinCube2() throws Exception {
+        clearSegment("test_kylin_cube_with_slr_empty");
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        long date1 = 0;
+        long date2 = f.parse("2013-01-01").getTime();
+        long date3 = f.parse("2022-01-01").getTime();
+        List<String> result = Lists.newArrayList();
+        result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2));
+        result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
+        return result;
+    }
+
+    @SuppressWarnings("unused") // called by reflection
+    private List<String> testInnerJoinCube() throws Exception {
+        clearSegment("test_kylin_cube_without_slr_empty");
+
+
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        // this cube's start date is 0, end date is 20501112000000
+        long date1 = 0;
+        long date2 = f.parse("2013-01-01").getTime();
+
+
+        // this cube doesn't support incremental build, always do full build
+
+        List<String> result = Lists.newArrayList();
+        result.add(buildSegment("test_kylin_cube_without_slr_empty", date1, date2));
+        return result;
+    }
+
+    @SuppressWarnings("unused") // called by reflection
+    private List<String> testLeftJoinCube2() throws Exception {
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        List<String> result = Lists.newArrayList();
+        final String cubeName = "test_kylin_cube_without_slr_left_join_empty";
+        // this cube's start date is 0, end date is 20120601000000
+        long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
+        long dateEnd = f.parse("2012-06-01").getTime();
+
+        clearSegment(cubeName);
+        result.add(buildSegment(cubeName, dateStart, dateEnd));
+
+        // then submit an append job, start date is 20120601000000, end
+        // date is 20220101000000
+        dateStart = f.parse("2012-06-01").getTime();
+        dateEnd = f.parse("2022-01-01").getTime();
+        result.add(buildSegment(cubeName, dateStart, dateEnd));
+        return result;
+
+    }
+
+    @SuppressWarnings("unused") // called by reflection
+    private List<String> testLeftJoinCube() throws Exception {
+        String cubeName = "test_kylin_cube_with_slr_left_join_empty";
+        clearSegment(cubeName);
+
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
+        long dateEnd = f.parse("2050-11-12").getTime();
+
+        // this cube's start date is 0, end date is 20501112000000
+        List<String> result = Lists.newArrayList();
+        result.add(buildSegment(cubeName, dateStart, dateEnd));
+        return result;
+
+    }
+
+    private void clearSegment(String cubeName) throws Exception {
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        cube.getSegments().clear();
+        cubeManager.updateCube(cube,true);
+    }
+
+
+    private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {
+        CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube(cubeName), endDate);
+        CubingJobBuilder cubingJobBuilder = new CubingJobBuilder(jobEngineConfig);
+        CubingJob job = cubingJobBuilder.buildJob(segment);
+        jobService.addJob(job);
+        waitForJob(job.getId());
+        return job.getId();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29058abc/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
new file mode 100644
index 0000000..dc42dc5
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -0,0 +1,224 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.job.hadoop.cubev2.IGTRecordWriter;
+import org.apache.kylin.job.hadoop.cubev2.InMemCubeBuilder;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ *
+ * This class is going to be deleted
+ */
+@Ignore("For dev testing")
+public class BuildCubeWithStreamTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamTest.class);
+
+    private KylinConfig kylinConfig;
+    private CubeManager cubeManager;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        DeployUtil.overrideJobJarLocations();
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        cubeManager = CubeManager.getInstance(kylinConfig);
+
+    }
+
+    @After
+    public void after() {
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+        CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+        final CubeDesc desc = cube.getDescriptor();
+        //   cube.getSegments().clear();
+        //   cubeManager.updateCube(cube);
+
+        CubeSegment cubeSegment = cube.getSegment("19700101000000_20150401000000", SegmentStatusEnum.NEW);
+        Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+
+//
+        for (DimensionDesc dim : desc.getDimensions()) {
+            // dictionary
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (desc.getRowkey().isUseDictionary(col)) {
+                    Dictionary dict = cubeSegment.getDictionary(col);
+                    if (dict == null) {
+                        throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
+                    }
+                    logger.info("Dictionary for " + col + " was put into dictionary map.");
+                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
+                }
+            }
+        }
+
+//        final String tableName = createIntermediateTable(desc, kylinConfig, null);
+        String tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a24dec89_efbd_425f_9a5f_8b78dd1412af"; // has 3089 records;
+//        tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a5e1eb5d_da6b_475d_9807_be0b61f03215"; // only 20 rows;
+//        tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150302000000_0a183367_f245_43d1_8850_1c138c8514c3";
+//        tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150301000000_ce061464_7962_4642_bd7d_7c3d8fbe9389";
+        tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150401000000_fb7ae579_d987_4900_a3b7_c60c731cd269"; // 2 million records
+        logger.info("intermediate table name:" + tableName);
+
+
+        ArrayBlockingQueue queue = new ArrayBlockingQueue<List<String>>(10000);
+
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new ConsoleGTRecordWriter());
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        Future<?> future = executorService.submit(cubeBuilder);
+
+        final Configuration conf = new Configuration();
+        HCatInputFormat.setInput(conf, "default", tableName);
+        final HCatSchema tableSchema = HCatInputFormat.getTableSchema(conf);
+        logger.info(StringUtils.join(tableSchema.getFieldNames(), "\n"));
+        HiveTableReader reader = new HiveTableReader("default", tableName);
+        List<String> row;
+        int counter = 0;
+        while (reader.next()) {
+            row = reader.getRowAsList();
+            queue.put(row);
+            counter++;
+            if(counter == 200000)
+                break;
+        }
+        queue.put(new ArrayList<String>(0));
+        reader.close();
+
+        try {
+            future.get();
+        } catch (Exception e) {
+            logger.error("stream build failed", e);
+            throw new IOException("Failed to build cube ", e);
+        }
+
+        logger.info("stream build finished");
+    }
+
+
+    private void buildDictionary(List<List<String>> table, CubeDesc desc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+        SetMultimap<TblColRef, String> valueMap = HashMultimap.create();
+
+        List<TblColRef> dimColumns = desc.listDimensionColumnsExcludingDerived();
+        for (List<String> row : table) {
+            for (int i = 0; i < dimColumns.size(); i++) {
+                String cell = row.get(i);
+                valueMap.put(dimColumns.get(i), cell);
+            }
+        }
+
+        for (DimensionDesc dim : desc.getDimensions()) {
+            // dictionary
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (desc.getRowkey().isUseDictionary(col)) {
+                    Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), Collections2.transform(valueMap.get(col), new Function<String, byte[]>() {
+                        @Nullable
+                        @Override
+                        public byte[] apply(String input) {
+                            if (input == null)
+                                return null;
+                            return input.getBytes();
+                        }
+                    }));
+
+                    logger.info("Building dictionary for " + col);
+                    dictionaryMap.put(col, dict);
+                }
+            }
+        }
+
+    }
+
+
+    class ConsoleGTRecordWriter implements IGTRecordWriter {
+
+        boolean verbose = false;
+
+        @Override
+        public void write(Long cuboidId, GTRecord record) throws IOException {
+            if (verbose)
+                System.out.println(record.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29058abc/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
new file mode 100644
index 0000000..9c0219a
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.invertedindex.IIJob;
+import org.apache.kylin.job.invertedindex.IIJobBuilder;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.junit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author shaoshi
+ */
+public class BuildIIWithEngineTest {
+
+    private JobEngineConfig jobEngineConfig;
+    private IIManager iiManager;
+
+    private DefaultScheduler scheduler;
+    protected ExecutableManager jobService;
+
+    protected static final String[] TEST_II_INSTANCES = new String[] { "test_kylin_ii_inner_join", "test_kylin_ii_left_join" };
+
+    private static final Log logger = LogFactory.getLog(BuildIIWithEngineTest.class);
+
+    protected void waitForJob(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        //DeployUtil.initCliWorkDir();
+        //        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        jobService = ExecutableManager.getInstance(kylinConfig);
+        scheduler = DefaultScheduler.getInstance();
+        scheduler.init(new JobEngineConfig(kylinConfig));
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        jobEngineConfig = new JobEngineConfig(kylinConfig);
+        for (String jobId : jobService.getAllJobIds()) {
+            if (jobService.getJob(jobId) instanceof IIJob) {
+                jobService.deleteJob(jobId);
+            }
+        }
+
+        iiManager = IIManager.getInstance(kylinConfig);
+        for (String iiInstance : TEST_II_INSTANCES) {
+
+            IIInstance ii = iiManager.getII(iiInstance);
+            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
+                ii.setStatus(RealizationStatusEnum.DISABLED);
+                iiManager.updateII(ii, true);
+            }
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+
+        for (String iiInstance : TEST_II_INSTANCES) {
+            IIInstance ii = iiManager.getII(iiInstance);
+            if (ii.getStatus() != RealizationStatusEnum.READY) {
+                ii.setStatus(RealizationStatusEnum.READY);
+                iiManager.updateII(ii, true);
+            }
+        }
+        backup();
+    }
+
+    @Test
+    @Ignore
+    public void testBuildII() throws Exception {
+
+        String[] testCase = new String[] { "buildIIInnerJoin", "buildIILeftJoin" };
+        ExecutorService executorService = Executors.newFixedThreadPool(testCase.length);
+        final CountDownLatch countDownLatch = new CountDownLatch(testCase.length);
+        List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length);
+        for (int i = 0; i < testCase.length; i++) {
+            tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch)));
+        }
+        countDownLatch.await();
+        for (int i = 0; i < tasks.size(); ++i) {
+            Future<List<String>> task = tasks.get(i);
+            final List<String> jobIds = task.get();
+            for (String jobId : jobIds) {
+                assertJobSucceed(jobId);
+            }
+        }
+
+    }
+
+    private void assertJobSucceed(String jobId) {
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(jobId).getState());
+    }
+
+    private class TestCallable implements Callable<List<String>> {
+
+        private final String methodName;
+        private final CountDownLatch countDownLatch;
+
+        public TestCallable(String methodName, CountDownLatch countDownLatch) {
+            this.methodName = methodName;
+            this.countDownLatch = countDownLatch;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public List<String> call() throws Exception {
+            try {
+                final Method method = BuildIIWithEngineTest.class.getDeclaredMethod(methodName);
+                method.setAccessible(true);
+                return (List<String>) method.invoke(BuildIIWithEngineTest.this);
+            } finally {
+                countDownLatch.countDown();
+            }
+        }
+    }
+
+    protected List<String> buildIIInnerJoin() throws Exception {
+        return buildII(TEST_II_INSTANCES[0]);
+    }
+
+    protected List<String> buildIILeftJoin() throws Exception {
+        return buildII(TEST_II_INSTANCES[1]);
+    }
+
+    protected List<String> buildII(String iiName) throws Exception {
+        clearSegment(iiName);
+
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        long date1 = 0;
+        long date2 = f.parse("2015-01-01").getTime();
+
+        List<String> result = Lists.newArrayList();
+        result.add(buildSegment(iiName, date1, date2));
+        return result;
+    }
+
+    private void clearSegment(String iiName) throws Exception {
+        IIInstance ii = iiManager.getII(iiName);
+        ii.getSegments().clear();
+        iiManager.updateII(ii,true);
+    }
+
+    private String buildSegment(String iiName, long startDate, long endDate) throws Exception {
+        IIInstance iiInstance = iiManager.getII(iiName);
+        IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
+        iiInstance.getSegments().add(segment);
+        iiManager.updateII(iiInstance, true);
+        IIJobBuilder iiJobBuilder = new IIJobBuilder(jobEngineConfig);
+        IIJob job = iiJobBuilder.buildJob(segment);
+        jobService.addJob(job);
+        waitForJob(job.getId());
+        return job.getId();
+    }
+
+    private int cleanupOldStorage() throws Exception {
+        String[] args = { "--delete", "true" };
+
+        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+        return exitCode;
+    }
+
+    private void backup() throws Exception {
+        int exitCode = cleanupOldStorage();
+        if (exitCode == 0) {
+            exportHBaseData();
+        }
+    }
+
+    private void exportHBaseData() throws IOException {
+        ExportHBaseData export = new ExportHBaseData();
+        export.exportTables();
+    }
+
+    public static void main(String[] args) throws Exception {
+        BuildIIWithEngineTest instance = new BuildIIWithEngineTest();
+
+        BuildIIWithEngineTest.beforeClass();
+        instance.before();
+        instance.testBuildII();
+        instance.after();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29058abc/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
new file mode 100644
index 0000000..6bceab7
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -0,0 +1,270 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
+import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static org.junit.Assert.fail;
+
+/**
+ */
+public class BuildIIWithStreamTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStreamTest.class);
+
+    private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
+    private IIManager iiManager;
+    private KylinConfig kylinConfig;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        DeployUtil.overrideJobJarLocations();
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        iiManager = IIManager.getInstance(kylinConfig);
+        iiManager = IIManager.getInstance(kylinConfig);
+        for (String iiInstance : II_NAME) {
+
+            IIInstance ii = iiManager.getII(iiInstance);
+            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
+                ii.setStatus(RealizationStatusEnum.DISABLED);
+                iiManager.updateII(ii,true);
+            }
+        }
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        backup();
+    }
+
+    private static int cleanupOldStorage() throws Exception {
+        String[] args = { "--delete", "true" };
+        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+        return exitCode;
+    }
+
+    private static void backup() throws Exception {
+        int exitCode = cleanupOldStorage();
+        if (exitCode == 0) {
+            exportHBaseData();
+        }
+    }
+
+    private static void exportHBaseData() throws IOException {
+        ExportHBaseData export = new ExportHBaseData();
+        export.exportTables();
+    }
+
+    private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException {
+        IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
+        JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
+        final String uuid = UUID.randomUUID().toString();
+        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, uuid);
+        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, jobEngineConfig.getHdfsWorkingDirectory() + "/kylin-" + uuid, uuid);
+        String insertDataHqls;
+        try {
+            insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, uuid, jobEngineConfig);
+        } catch (IOException e1) {
+            e1.printStackTrace();
+            throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
+        }
+
+        ShellExecutable step = new ShellExecutable();
+        StringBuffer buf = new StringBuffer();
+        buf.append("hive -e \"");
+        buf.append(dropTableHql + "\n");
+        buf.append(createTableHql + "\n");
+        buf.append(insertDataHqls + "\n");
+        buf.append("\"");
+
+        step.setCmd(buf.toString());
+        logger.info(step.getCmd());
+        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+        kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
+        return intermediateTableDesc.getTableName(uuid);
+    }
+
+    private void clearSegment(String iiName) throws Exception {
+        IIInstance ii = iiManager.getII(iiName);
+        ii.getSegments().clear();
+        iiManager.updateII(ii,true);
+    }
+
+    private IISegment createSegment(String iiName) throws Exception {
+        clearSegment(iiName);
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        long date1 = 0;
+        long date2 = f.parse("2015-01-01").getTime();
+        return buildSegment(iiName, date1, date2);
+    }
+
+    private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception {
+        IIInstance iiInstance = iiManager.getII(iiName);
+        IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
+        iiInstance.getSegments().add(segment);
+        iiManager.updateII(iiInstance,true);
+        return segment;
+    }
+
+    private void buildII(String iiName) throws Exception {
+        final IIDesc desc = iiManager.getII(iiName).getDescriptor();
+        final String tableName = createIntermediateTable(desc, kylinConfig);
+        logger.info("intermediate table name:" + tableName);
+        final Configuration conf = new Configuration();
+        HCatInputFormat.setInput(conf, "default", tableName);
+        final HCatSchema tableSchema = HCatInputFormat.getTableSchema(conf);
+        logger.info(StringUtils.join(tableSchema.getFieldNames(), "\n"));
+        HiveTableReader reader = new HiveTableReader("default", tableName);
+        final List<TblColRef> tblColRefs = desc.listAllColumns();
+        for (TblColRef tblColRef : tblColRefs) {
+            if (desc.isMetricsCol(tblColRef)) {
+                logger.info("matrix:" + tblColRef.getName());
+            } else {
+                logger.info("measure:" + tblColRef.getName());
+            }
+        }
+        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>();
+        final IISegment segment = createSegment(iiName);
+        String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
+        ToolRunner.run(new IICreateHTableJob(), args);
+
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        final IIStreamBuilder streamBuilder = new IIStreamBuilder(queue, iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0);
+
+        List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
+        int count = sorted.size();
+        for (String[] row : sorted) {
+            logger.info("another row: " + StringUtils.join(row, ","));
+            queue.put(parse(row));
+        }
+
+        reader.close();
+        logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
+        queue.put(StreamMessage.EOF);
+        final Future<?> future = executorService.submit(streamBuilder);
+        try {
+            future.get();
+        } catch (Exception e) {
+            logger.error("stream build failed", e);
+            fail("stream build failed");
+        }
+
+        logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
+    }
+
+    @Test
+    public void test() throws Exception {
+        for (String iiName : II_NAME) {
+            buildII(iiName);
+            IIInstance ii = iiManager.getII(iiName);
+            if (ii.getStatus() != RealizationStatusEnum.READY) {
+                ii.setStatus(RealizationStatusEnum.READY);
+                iiManager.updateII(ii,true);
+            }
+        }
+    }
+
+    private StreamMessage parse(String[] row) {
+        return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
+    }
+
+    private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {
+        List<String[]> unsorted = Lists.newArrayList();
+        while (reader.next()) {
+            unsorted.add(reader.getRow());
+        }
+        Collections.sort(unsorted, new Comparator<String[]>() {
+            @Override
+            public int compare(String[] o1, String[] o2) {
+                long t1 = DateFormat.stringToMillis(o1[tsCol]);
+                long t2 = DateFormat.stringToMillis(o2[tsCol]);
+                return Long.compare(t1, t2);
+            }
+        });
+        return unsorted;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29058abc/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithEngineTest.java
deleted file mode 100644
index e99bd01..0000000
--- a/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithEngineTest.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.cube.CubingJob;
-import org.apache.kylin.job.cube.CubingJobBuilder;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.concurrent.*;
-
-import static org.junit.Assert.assertEquals;
-
-public class ITBuildCubeWithEngineTest {
-
-    private JobEngineConfig jobEngineConfig;
-
-    private CubeManager cubeManager;
-
-    private DefaultScheduler scheduler;
-
-    protected ExecutableManager jobService;
-
-    private static final Log logger = LogFactory.getLog(ITBuildCubeWithEngineTest.class);
-
-    protected void waitForJob(String jobId) {
-        while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
-                break;
-            } else {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
-        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.getInstance();
-        scheduler.init(new JobEngineConfig(kylinConfig));
-        if (!scheduler.hasStarted()) {
-            throw new RuntimeException("scheduler has not been started");
-        }
-        cubeManager = CubeManager.getInstance(kylinConfig);
-        jobEngineConfig = new JobEngineConfig(kylinConfig);
-        for (String jobId : jobService.getAllJobIds()) {
-            if(jobService.getJob(jobId) instanceof CubingJob){
-                jobService.deleteJob(jobId);
-            }
-        }
-
-    }
-
-    @After
-    public void after() {
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws Exception {
-        DeployUtil.prepareTestData("left", "test_kylin_cube_with_slr_left_join_empty");
-        testInner();
-        testLeft();
-    }
-
-    private void testInner() throws Exception {
-        String[] testCase = new String[]{
-                "testInnerJoinCube",
-                "testInnerJoinCube2",
-        };
-        runTestAndAssertSucceed(testCase);
-    }
-
-    private void testLeft() throws Exception {
-        String[] testCase = new String[]{
-                "testLeftJoinCube",
-                "testLeftJoinCube2",
-        };
-        runTestAndAssertSucceed(testCase);
-    }
-
-    private void runTestAndAssertSucceed(String[] testCase) throws Exception {
-        ExecutorService executorService = Executors.newFixedThreadPool(testCase.length);
-        final CountDownLatch countDownLatch = new CountDownLatch(testCase.length);
-        List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length);
-        for (int i = 0; i < testCase.length; i++) {
-            tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch)));
-        }
-        countDownLatch.await();
-        try {
-            for (int i = 0; i < tasks.size(); ++i) {
-                Future<List<String>> task = tasks.get(i);
-                final List<String> jobIds = task.get();
-                for (String jobId : jobIds) {
-                    assertJobSucceed(jobId);
-                }
-            }
-        } catch (Exception ex) {
-            logger.error(ex);
-            throw ex;
-        }
-    }
-
-    private void assertJobSucceed(String jobId) {
-        assertEquals("The job '" + jobId + "' is failed.", ExecutableState.SUCCEED, jobService.getOutput(jobId).getState());
-    }
-
-    private class TestCallable implements Callable<List<String>> {
-
-        private final String methodName;
-        private final CountDownLatch countDownLatch;
-
-        public TestCallable(String methodName, CountDownLatch countDownLatch) {
-            this.methodName = methodName;
-            this.countDownLatch = countDownLatch;
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public List<String> call() throws Exception {
-            try {
-                final Method method = ITBuildCubeWithEngineTest.class.getDeclaredMethod(methodName);
-                method.setAccessible(true);
-                return (List<String>) method.invoke(ITBuildCubeWithEngineTest.this);
-            } finally {
-                countDownLatch.countDown();
-            }
-        }
-    }
-
-    @SuppressWarnings("unused") // called by reflection
-    private List<String> testInnerJoinCube2() throws Exception {
-        clearSegment("test_kylin_cube_with_slr_empty");
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-        long date1 = 0;
-        long date2 = f.parse("2013-01-01").getTime();
-        long date3 = f.parse("2022-01-01").getTime();
-        List<String> result = Lists.newArrayList();
-        result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2));
-        result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
-        return result;
-    }
-
-    @SuppressWarnings("unused") // called by reflection
-    private List<String> testInnerJoinCube() throws Exception {
-        clearSegment("test_kylin_cube_without_slr_empty");
-
-
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-        // this cube's start date is 0, end date is 20501112000000
-        long date1 = 0;
-        long date2 = f.parse("2013-01-01").getTime();
-
-
-        // this cube doesn't support incremental build, always do full build
-
-        List<String> result = Lists.newArrayList();
-        result.add(buildSegment("test_kylin_cube_without_slr_empty", date1, date2));
-        return result;
-    }
-
-    @SuppressWarnings("unused") // called by reflection
-    private List<String> testLeftJoinCube2() throws Exception {
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-        List<String> result = Lists.newArrayList();
-        final String cubeName = "test_kylin_cube_without_slr_left_join_empty";
-        // this cube's start date is 0, end date is 20120601000000
-        long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
-        long dateEnd = f.parse("2012-06-01").getTime();
-
-        clearSegment(cubeName);
-        result.add(buildSegment(cubeName, dateStart, dateEnd));
-
-        // then submit an append job, start date is 20120601000000, end
-        // date is 20220101000000
-        dateStart = f.parse("2012-06-01").getTime();
-        dateEnd = f.parse("2022-01-01").getTime();
-        result.add(buildSegment(cubeName, dateStart, dateEnd));
-        return result;
-
-    }
-
-    @SuppressWarnings("unused") // called by reflection
-    private List<String> testLeftJoinCube() throws Exception {
-        String cubeName = "test_kylin_cube_with_slr_left_join_empty";
-        clearSegment(cubeName);
-
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-        long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
-        long dateEnd = f.parse("2050-11-12").getTime();
-
-        // this cube's start date is 0, end date is 20501112000000
-        List<String> result = Lists.newArrayList();
-        result.add(buildSegment(cubeName, dateStart, dateEnd));
-        return result;
-
-    }
-
-    private void clearSegment(String cubeName) throws Exception {
-        CubeInstance cube = cubeManager.getCube(cubeName);
-        cube.getSegments().clear();
-        cubeManager.updateCube(cube,true);
-    }
-
-
-    private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {
-        CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube(cubeName), endDate);
-        CubingJobBuilder cubingJobBuilder = new CubingJobBuilder(jobEngineConfig);
-        CubingJob job = cubingJobBuilder.buildJob(segment);
-        jobService.addJob(job);
-        waitForJob(job.getId());
-        return job.getId();
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29058abc/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithStreamTest.java
deleted file mode 100644
index 8a26397..0000000
--- a/job/src/test/java/org/apache/kylin/job/ITBuildCubeWithStreamTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SetMultimap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.lookup.HiveTableReader;
-import org.apache.kylin.job.hadoop.cubev2.IGTRecordWriter;
-import org.apache.kylin.job.hadoop.cubev2.InMemCubeBuilder;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- *
- * This class is going to be deleted
- */
-@Ignore("For dev testing")
-public class ITBuildCubeWithStreamTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(ITBuildCubeWithStreamTest.class);
-
-    private KylinConfig kylinConfig;
-    private CubeManager cubeManager;
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-        DeployUtil.overrideJobJarLocations();
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        cubeManager = CubeManager.getInstance(kylinConfig);
-
-    }
-
-    @After
-    public void after() {
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws Exception {
-        CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
-        final CubeDesc desc = cube.getDescriptor();
-        //   cube.getSegments().clear();
-        //   cubeManager.updateCube(cube);
-
-        CubeSegment cubeSegment = cube.getSegment("19700101000000_20150401000000", SegmentStatusEnum.NEW);
-        Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
-
-//
-        for (DimensionDesc dim : desc.getDimensions()) {
-            // dictionary
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (desc.getRowkey().isUseDictionary(col)) {
-                    Dictionary dict = cubeSegment.getDictionary(col);
-                    if (dict == null) {
-                        throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
-                    }
-                    logger.info("Dictionary for " + col + " was put into dictionary map.");
-                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
-                }
-            }
-        }
-
-//        final String tableName = createIntermediateTable(desc, kylinConfig, null);
-        String tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a24dec89_efbd_425f_9a5f_8b78dd1412af"; // has 3089 records;
-//        tableName = "kylin_intermediate_test_kylin_cube_without_slr_desc_19700101000000_20130112000000_a5e1eb5d_da6b_475d_9807_be0b61f03215"; // only 20 rows;
-//        tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150302000000_0a183367_f245_43d1_8850_1c138c8514c3";
-//        tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150301000000_ce061464_7962_4642_bd7d_7c3d8fbe9389";
-        tableName = "kylin_intermediate_test_kylin_cube_without_slr_left_join_desc_19700101000000_20150401000000_fb7ae579_d987_4900_a3b7_c60c731cd269"; // 2 million records
-        logger.info("intermediate table name:" + tableName);
-
-
-        ArrayBlockingQueue queue = new ArrayBlockingQueue<List<String>>(10000);
-
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new ConsoleGTRecordWriter());
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cubeBuilder);
-
-        final Configuration conf = new Configuration();
-        HCatInputFormat.setInput(conf, "default", tableName);
-        final HCatSchema tableSchema = HCatInputFormat.getTableSchema(conf);
-        logger.info(StringUtils.join(tableSchema.getFieldNames(), "\n"));
-        HiveTableReader reader = new HiveTableReader("default", tableName);
-        List<String> row;
-        int counter = 0;
-        while (reader.next()) {
-            row = reader.getRowAsList();
-            queue.put(row);
-            counter++;
-            if(counter == 200000)
-                break;
-        }
-        queue.put(new ArrayList<String>(0));
-        reader.close();
-
-        try {
-            future.get();
-        } catch (Exception e) {
-            logger.error("stream build failed", e);
-            throw new IOException("Failed to build cube ", e);
-        }
-
-        logger.info("stream build finished");
-    }
-
-
-    private void buildDictionary(List<List<String>> table, CubeDesc desc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
-        SetMultimap<TblColRef, String> valueMap = HashMultimap.create();
-
-        List<TblColRef> dimColumns = desc.listDimensionColumnsExcludingDerived();
-        for (List<String> row : table) {
-            for (int i = 0; i < dimColumns.size(); i++) {
-                String cell = row.get(i);
-                valueMap.put(dimColumns.get(i), cell);
-            }
-        }
-
-        for (DimensionDesc dim : desc.getDimensions()) {
-            // dictionary
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (desc.getRowkey().isUseDictionary(col)) {
-                    Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), Collections2.transform(valueMap.get(col), new Function<String, byte[]>() {
-                        @Nullable
-                        @Override
-                        public byte[] apply(String input) {
-                            if (input == null)
-                                return null;
-                            return input.getBytes();
-                        }
-                    }));
-
-                    logger.info("Building dictionary for " + col);
-                    dictionaryMap.put(col, dict);
-                }
-            }
-        }
-
-    }
-
-
-    class ConsoleGTRecordWriter implements IGTRecordWriter {
-
-        boolean verbose = false;
-
-        @Override
-        public void write(Long cuboidId, GTRecord record) throws IOException {
-            if (verbose)
-                System.out.println(record.toString());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29058abc/job/src/test/java/org/apache/kylin/job/ITBuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITBuildIIWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/ITBuildIIWithEngineTest.java
deleted file mode 100644
index 23f97d0..0000000
--- a/job/src/test/java/org/apache/kylin/job/ITBuildIIWithEngineTest.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.invertedindex.IIJob;
-import org.apache.kylin.job.invertedindex.IIJobBuilder;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.junit.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.concurrent.*;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author shaoshi
- */
-public class ITBuildIIWithEngineTest {
-
-    private JobEngineConfig jobEngineConfig;
-    private IIManager iiManager;
-
-    private DefaultScheduler scheduler;
-    protected ExecutableManager jobService;
-
-    protected static final String[] TEST_II_INSTANCES = new String[] { "test_kylin_ii_inner_join", "test_kylin_ii_left_join" };
-
-    private static final Log logger = LogFactory.getLog(ITBuildIIWithEngineTest.class);
-
-    protected void waitForJob(String jobId) {
-        while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
-                break;
-            } else {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        //DeployUtil.initCliWorkDir();
-        //        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.getInstance();
-        scheduler.init(new JobEngineConfig(kylinConfig));
-        if (!scheduler.hasStarted()) {
-            throw new RuntimeException("scheduler has not been started");
-        }
-        jobEngineConfig = new JobEngineConfig(kylinConfig);
-        for (String jobId : jobService.getAllJobIds()) {
-            if (jobService.getJob(jobId) instanceof IIJob) {
-                jobService.deleteJob(jobId);
-            }
-        }
-
-        iiManager = IIManager.getInstance(kylinConfig);
-        for (String iiInstance : TEST_II_INSTANCES) {
-
-            IIInstance ii = iiManager.getII(iiInstance);
-            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
-                ii.setStatus(RealizationStatusEnum.DISABLED);
-                iiManager.updateII(ii, true);
-            }
-        }
-    }
-
-    @After
-    public void after() throws Exception {
-
-        for (String iiInstance : TEST_II_INSTANCES) {
-            IIInstance ii = iiManager.getII(iiInstance);
-            if (ii.getStatus() != RealizationStatusEnum.READY) {
-                ii.setStatus(RealizationStatusEnum.READY);
-                iiManager.updateII(ii, true);
-            }
-        }
-        backup();
-    }
-
-    @Test
-    @Ignore
-    public void testBuildII() throws Exception {
-
-        String[] testCase = new String[] { "buildIIInnerJoin", "buildIILeftJoin" };
-        ExecutorService executorService = Executors.newFixedThreadPool(testCase.length);
-        final CountDownLatch countDownLatch = new CountDownLatch(testCase.length);
-        List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length);
-        for (int i = 0; i < testCase.length; i++) {
-            tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch)));
-        }
-        countDownLatch.await();
-        for (int i = 0; i < tasks.size(); ++i) {
-            Future<List<String>> task = tasks.get(i);
-            final List<String> jobIds = task.get();
-            for (String jobId : jobIds) {
-                assertJobSucceed(jobId);
-            }
-        }
-
-    }
-
-    private void assertJobSucceed(String jobId) {
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(jobId).getState());
-    }
-
-    private class TestCallable implements Callable<List<String>> {
-
-        private final String methodName;
-        private final CountDownLatch countDownLatch;
-
-        public TestCallable(String methodName, CountDownLatch countDownLatch) {
-            this.methodName = methodName;
-            this.countDownLatch = countDownLatch;
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public List<String> call() throws Exception {
-            try {
-                final Method method = ITBuildIIWithEngineTest.class.getDeclaredMethod(methodName);
-                method.setAccessible(true);
-                return (List<String>) method.invoke(ITBuildIIWithEngineTest.this);
-            } finally {
-                countDownLatch.countDown();
-            }
-        }
-    }
-
-    protected List<String> buildIIInnerJoin() throws Exception {
-        return buildII(TEST_II_INSTANCES[0]);
-    }
-
-    protected List<String> buildIILeftJoin() throws Exception {
-        return buildII(TEST_II_INSTANCES[1]);
-    }
-
-    protected List<String> buildII(String iiName) throws Exception {
-        clearSegment(iiName);
-
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-        long date1 = 0;
-        long date2 = f.parse("2015-01-01").getTime();
-
-        List<String> result = Lists.newArrayList();
-        result.add(buildSegment(iiName, date1, date2));
-        return result;
-    }
-
-    private void clearSegment(String iiName) throws Exception {
-        IIInstance ii = iiManager.getII(iiName);
-        ii.getSegments().clear();
-        iiManager.updateII(ii,true);
-    }
-
-    private String buildSegment(String iiName, long startDate, long endDate) throws Exception {
-        IIInstance iiInstance = iiManager.getII(iiName);
-        IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
-        iiInstance.getSegments().add(segment);
-        iiManager.updateII(iiInstance, true);
-        IIJobBuilder iiJobBuilder = new IIJobBuilder(jobEngineConfig);
-        IIJob job = iiJobBuilder.buildJob(segment);
-        jobService.addJob(job);
-        waitForJob(job.getId());
-        return job.getId();
-    }
-
-    private int cleanupOldStorage() throws Exception {
-        String[] args = { "--delete", "true" };
-
-        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
-        return exitCode;
-    }
-
-    private void backup() throws Exception {
-        int exitCode = cleanupOldStorage();
-        if (exitCode == 0) {
-            exportHBaseData();
-        }
-    }
-
-    private void exportHBaseData() throws IOException {
-        ExportHBaseData export = new ExportHBaseData();
-        export.exportTables();
-    }
-
-    public static void main(String[] args) throws Exception {
-        ITBuildIIWithEngineTest instance = new ITBuildIIWithEngineTest();
-
-        ITBuildIIWithEngineTest.beforeClass();
-        instance.before();
-        instance.testBuildII();
-        instance.after();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29058abc/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java
deleted file mode 100644
index cfb7bdb..0000000
--- a/job/src/test/java/org/apache/kylin/job/ITBuildIIWithStreamTest.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.dict.lookup.HiveTableReader;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.job.common.ShellExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import static org.junit.Assert.fail;
-
-/**
- */
-public class ITBuildIIWithStreamTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(ITBuildIIWithStreamTest.class);
-
-    private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
-    private IIManager iiManager;
-    private KylinConfig kylinConfig;
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-        DeployUtil.overrideJobJarLocations();
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        iiManager = IIManager.getInstance(kylinConfig);
-        iiManager = IIManager.getInstance(kylinConfig);
-        for (String iiInstance : II_NAME) {
-
-            IIInstance ii = iiManager.getII(iiInstance);
-            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
-                ii.setStatus(RealizationStatusEnum.DISABLED);
-                iiManager.updateII(ii,true);
-            }
-        }
-    }
-
-    @AfterClass
-    public static void afterClass() throws Exception {
-        backup();
-    }
-
-    private static int cleanupOldStorage() throws Exception {
-        String[] args = { "--delete", "true" };
-        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
-        return exitCode;
-    }
-
-    private static void backup() throws Exception {
-        int exitCode = cleanupOldStorage();
-        if (exitCode == 0) {
-            exportHBaseData();
-        }
-    }
-
-    private static void exportHBaseData() throws IOException {
-        ExportHBaseData export = new ExportHBaseData();
-        export.exportTables();
-    }
-
-    private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException {
-        IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
-        JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
-        final String uuid = UUID.randomUUID().toString();
-        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, uuid);
-        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, jobEngineConfig.getHdfsWorkingDirectory() + "/kylin-" + uuid, uuid);
-        String insertDataHqls;
-        try {
-            insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, uuid, jobEngineConfig);
-        } catch (IOException e1) {
-            e1.printStackTrace();
-            throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
-        }
-
-        ShellExecutable step = new ShellExecutable();
-        StringBuffer buf = new StringBuffer();
-        buf.append("hive -e \"");
-        buf.append(dropTableHql + "\n");
-        buf.append(createTableHql + "\n");
-        buf.append(insertDataHqls + "\n");
-        buf.append("\"");
-
-        step.setCmd(buf.toString());
-        logger.info(step.getCmd());
-        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-        kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
-        return intermediateTableDesc.getTableName(uuid);
-    }
-
-    private void clearSegment(String iiName) throws Exception {
-        IIInstance ii = iiManager.getII(iiName);
-        ii.getSegments().clear();
-        iiManager.updateII(ii,true);
-    }
-
-    private IISegment createSegment(String iiName) throws Exception {
-        clearSegment(iiName);
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-        long date1 = 0;
-        long date2 = f.parse("2015-01-01").getTime();
-        return buildSegment(iiName, date1, date2);
-    }
-
-    private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception {
-        IIInstance iiInstance = iiManager.getII(iiName);
-        IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
-        iiInstance.getSegments().add(segment);
-        iiManager.updateII(iiInstance,true);
-        return segment;
-    }
-
-    private void buildII(String iiName) throws Exception {
-        final IIDesc desc = iiManager.getII(iiName).getDescriptor();
-        final String tableName = createIntermediateTable(desc, kylinConfig);
-        logger.info("intermediate table name:" + tableName);
-        final Configuration conf = new Configuration();
-        HCatInputFormat.setInput(conf, "default", tableName);
-        final HCatSchema tableSchema = HCatInputFormat.getTableSchema(conf);
-        logger.info(StringUtils.join(tableSchema.getFieldNames(), "\n"));
-        HiveTableReader reader = new HiveTableReader("default", tableName);
-        final List<TblColRef> tblColRefs = desc.listAllColumns();
-        for (TblColRef tblColRef : tblColRefs) {
-            if (desc.isMetricsCol(tblColRef)) {
-                logger.info("matrix:" + tblColRef.getName());
-            } else {
-                logger.info("measure:" + tblColRef.getName());
-            }
-        }
-        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>();
-        final IISegment segment = createSegment(iiName);
-        String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
-        ToolRunner.run(new IICreateHTableJob(), args);
-
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final IIStreamBuilder streamBuilder = new IIStreamBuilder(queue, iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0);
-
-        List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
-        int count = sorted.size();
-        for (String[] row : sorted) {
-            logger.info("another row: " + StringUtils.join(row, ","));
-            queue.put(parse(row));
-        }
-
-        reader.close();
-        logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
-        queue.put(StreamMessage.EOF);
-        final Future<?> future = executorService.submit(streamBuilder);
-        try {
-            future.get();
-        } catch (Exception e) {
-            logger.error("stream build failed", e);
-            fail("stream build failed");
-        }
-
-        logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
-    }
-
-    @Test
-    public void test() throws Exception {
-        for (String iiName : II_NAME) {
-            buildII(iiName);
-            IIInstance ii = iiManager.getII(iiName);
-            if (ii.getStatus() != RealizationStatusEnum.READY) {
-                ii.setStatus(RealizationStatusEnum.READY);
-                iiManager.updateII(ii,true);
-            }
-        }
-    }
-
-    private StreamMessage parse(String[] row) {
-        return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
-    }
-
-    private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {
-        List<String[]> unsorted = Lists.newArrayList();
-        while (reader.next()) {
-            unsorted.add(reader.getRow());
-        }
-        Collections.sort(unsorted, new Comparator<String[]>() {
-            @Override
-            public int compare(String[] o1, String[] o2) {
-                long t1 = DateFormat.stringToMillis(o1[tsCol]);
-                long t2 = DateFormat.stringToMillis(o2[tsCol]);
-                return Long.compare(t1, t2);
-            }
-        });
-        return unsorted;
-    }
-
-}