You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/19 02:27:58 UTC

[09/12] incubator-kylin git commit: KYLIN-1010 Decompose project job

KYLIN-1010 Decompose project job


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c44caa7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c44caa7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c44caa7b

Branch: refs/heads/2.x-staging
Commit: c44caa7b18c76f399dc7008a189e61dc7a2e4a5f
Parents: 37fa541
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Sep 17 17:44:40 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sat Sep 19 08:10:32 2015 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                | 170 +++--
 .../engine/spark/BuildCubeWithSparkTest.java    | 148 +++++
 .../kylin/job/BuildCubeWithEngineTest.java      | 283 ++++++++
 .../kylin/job/BuildCubeWithStreamTest.java      | 132 ++++
 .../apache/kylin/job/BuildIIWithEngineTest.java | 250 +++++++
 .../apache/kylin/job/BuildIIWithStreamTest.java | 248 +++++++
 .../java/org/apache/kylin/job/DataGenTest.java  |  56 ++
 .../kylin/job/DeployLocalMetaToRemoteTest.java  |  71 ++
 .../java/org/apache/kylin/job/DeployUtil.java   | 261 ++++++++
 .../org/apache/kylin/job/ExportHBaseData.java   | 160 +++++
 .../job/ITKafkaBasedIIStreamBuilderTest.java    |  85 +++
 .../apache/kylin/job/dataGen/ColumnConfig.java  |  71 ++
 .../kylin/job/dataGen/FactTableGenerator.java   | 647 +++++++++++++++++++
 .../org/apache/kylin/job/dataGen/GenConfig.java |  81 +++
 .../kylin/job/hadoop/invertedindex/IITest.java  | 240 +++++++
 .../job/streaming/CubeStreamConsumerTest.java   |  90 +++
 .../streaming/PeriodicalStreamBuilderTest.java  | 144 +++++
 .../streaming/StreamingTableDataGenerator.java  |  76 +++
 .../hive/ITHiveSourceTableLoaderTest.java       |  58 ++
 .../source/hive/ITHiveTableReaderTest.java      |  49 ++
 .../source/hive/ITSnapshotManagerTest.java      |  83 +++
 build/script/prepare_libs.sh                    |   2 +-
 .../java/org/apache/kylin/job/JobInstance.java  |   1 +
 .../kylin/job/dao/ExecutableOutputPO.java       |   1 +
 .../org/apache/kylin/job/dao/ExecutablePO.java  |   1 +
 .../job/impl/threadpool/BaseSchedulerTest.java  | 101 +++
 .../impl/threadpool/DefaultSchedulerTest.java   | 151 +++++
 core-storage/pom.xml                            |   6 -
 engine-spark/pom.xml                            |   8 -
 .../apache/kylin/engine/spark/SparkCubing.java  |  57 +-
 .../engine/spark/BuildCubeWithSparkTest.java    | 148 -----
 engine-streaming/pom.xml                        |   5 -
 invertedindex/pom.xml                           |  46 +-
 .../dict/CreateInvertedIndexDictionaryJob.java  |  70 ++
 .../job/hadoop/invertedindex/IIBulkLoadJob.java |  74 +++
 .../hadoop/invertedindex/IICreateHFileJob.java  |  81 +++
 .../invertedindex/IICreateHFileMapper.java      |  55 ++
 .../hadoop/invertedindex/IICreateHTableJob.java | 148 +++++
 .../invertedindex/IIDeployCoprocessorCLI.java   | 157 +++++
 .../IIDistinctColumnsCombiner.java              |  58 ++
 .../invertedindex/IIDistinctColumnsJob.java     | 136 ++++
 .../invertedindex/IIDistinctColumnsMapper.java  |  66 ++
 .../invertedindex/IIDistinctColumnsReducer.java |  77 +++
 .../hadoop/invertedindex/InvertedIndexJob.java  | 164 +++++
 .../invertedindex/InvertedIndexMapper.java      |  90 +++
 .../invertedindex/InvertedIndexPartitioner.java |  73 +++
 .../invertedindex/InvertedIndexReducer.java     | 100 +++
 .../apache/kylin/job/invertedindex/IIJob.java   |  50 ++
 .../kylin/job/invertedindex/IIJobBuilder.java   | 230 +++++++
 .../java/org/apache/kylin/job/tools/IICLI.java  | 106 +++
 job/.gitignore                                  |   1 -
 job/.settings/org.eclipse.core.resources.prefs  |   6 -
 job/.settings/org.eclipse.jdt.core.prefs        | 379 -----------
 job/.settings/org.eclipse.jdt.ui.prefs          |   7 -
 job/pom.xml                                     | 314 ---------
 .../dict/CreateInvertedIndexDictionaryJob.java  |  70 --
 .../job/hadoop/invertedindex/IIBulkLoadJob.java |  83 ---
 .../hadoop/invertedindex/IICreateHFileJob.java  |  91 ---
 .../invertedindex/IICreateHFileMapper.java      |  55 --
 .../hadoop/invertedindex/IICreateHTableJob.java | 156 -----
 .../IIDistinctColumnsCombiner.java              |  58 --
 .../invertedindex/IIDistinctColumnsJob.java     | 136 ----
 .../invertedindex/IIDistinctColumnsMapper.java  |  66 --
 .../invertedindex/IIDistinctColumnsReducer.java |  77 ---
 .../hadoop/invertedindex/InvertedIndexJob.java  | 164 -----
 .../invertedindex/InvertedIndexMapper.java      |  90 ---
 .../invertedindex/InvertedIndexPartitioner.java |  73 ---
 .../invertedindex/InvertedIndexReducer.java     | 100 ---
 .../apache/kylin/job/invertedindex/IIJob.java   |  50 --
 .../kylin/job/invertedindex/IIJobBuilder.java   | 230 -------
 .../java/org/apache/kylin/job/tools/IICLI.java  | 106 ---
 .../kylin/job/BuildCubeWithEngineTest.java      | 283 --------
 .../kylin/job/BuildCubeWithStreamTest.java      | 132 ----
 .../apache/kylin/job/BuildIIWithEngineTest.java | 250 -------
 .../apache/kylin/job/BuildIIWithStreamTest.java | 248 -------
 .../java/org/apache/kylin/job/DataGenTest.java  |  56 --
 .../kylin/job/DeployLocalMetaToRemoteTest.java  |  71 --
 .../java/org/apache/kylin/job/DeployUtil.java   | 262 --------
 .../org/apache/kylin/job/ExportHBaseData.java   | 160 -----
 .../job/ITKafkaBasedIIStreamBuilderTest.java    |  85 ---
 .../apache/kylin/job/dataGen/ColumnConfig.java  |  71 --
 .../kylin/job/dataGen/FactTableGenerator.java   | 647 -------------------
 .../org/apache/kylin/job/dataGen/GenConfig.java |  81 ---
 .../job/dataGen/StreamingDataGenerator.java     |  83 ---
 .../kylin/job/hadoop/invertedindex/IITest.java  | 240 -------
 .../job/impl/threadpool/BaseSchedulerTest.java  | 109 ----
 .../impl/threadpool/DefaultSchedulerTest.java   | 150 -----
 .../job/streaming/CubeStreamConsumerTest.java   |  90 ---
 .../streaming/PeriodicalStreamBuilderTest.java  | 144 -----
 .../streaming/StreamingTableDataGenerator.java  |  76 ---
 .../job/tools/ColumnCardinalityReducerTest.java | 115 ----
 .../hive/ITHiveSourceTableLoaderTest.java       |  58 --
 .../source/hive/ITHiveTableReaderTest.java      |  49 --
 .../source/hive/ITSnapshotManagerTest.java      |  83 ---
 job/src/test/resources/data/flat_table/000000_0 | Bin 110778 -> 0 bytes
 .../resources/data/test_cal_dt/part-r-00000     | 366 -----------
 .../expected_result/flat_item/part-r-00000      | Bin 565 -> 0 bytes
 .../jarfile/SampleBadJavaProgram.jarfile        | Bin 1006 -> 0 bytes
 .../resources/jarfile/SampleJavaProgram.jarfile | Bin 1166 -> 0 bytes
 .../test/resources/json/dummy_jobinstance.json  | 195 ------
 pom.xml                                         |   1 -
 query/pom.xml                                   |   6 +
 server/pom.xml                                  |  15 +-
 .../java/org/apache/kylin/rest/DebugTomcat.java |   2 +-
 .../rest/security/RealAclHBaseStorage.java      |   2 +-
 .../apache/kylin/rest/service/CubeService.java  |   2 +-
 .../apache/kylin/rest/service/QueryService.java |   2 +-
 source-hive/pom.xml                             |   6 +
 .../ColumnCardinalityReducerTest.java           | 115 ++++
 source-kafka/pom.xml                            |  11 +-
 storage-hbase/pom.xml                           |   1 -
 .../kylin/storage/hbase/HBaseConnection.java    | 234 +++++++
 .../kylin/storage/hbase/HBaseResourceStore.java |   1 -
 .../storage/hbase/cube/v1/CubeStorageQuery.java |   2 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |   3 +-
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java |   2 +-
 .../hbase/ii/InvertedIndexStorageQuery.java     |   2 +-
 .../storage/hbase/steps/HBaseConnection.java    | 234 -------
 .../hbase/steps/HBaseStreamingOutput.java       |   1 +
 .../hbase/util/DeployCoprocessorCLI.java        |   6 +-
 .../hbase/util/GridTableHBaseBenchmark.java     |   2 +-
 .../kylin/storage/hbase/util/PingHBaseCLI.java  |   2 +-
 .../storage/hbase/util/ZookeeperJobLock.java    |   3 +-
 .../hbase/ii/ITInvertedIndexHBaseTest.java      |   2 +-
 .../storage/hbase/steps/HbaseImporter.java      |   1 +
 streaming/pom.xml                               |   6 +-
 .../kylin/job/streaming/CubeStreamConsumer.java |   2 +-
 .../kylin/job/streaming/StreamingBootstrap.java |   2 +-
 .../invertedindex/IIStreamConsumer.java         |   2 +-
 129 files changed, 5792 insertions(+), 6959 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index bc758f7..99557fb 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -15,7 +15,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-engine-streaming</artifactId>
+            <artifactId>kylin-source-hive</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
@@ -35,17 +35,141 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-source-hive</artifactId>
+            <artifactId>kylin-engine-spark</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
-
-
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-streaming</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-invertedindex</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-streaming</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
         
+        <!-- Env & Test -->
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-job</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-storage-hbase</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.mrunit</groupId>
+            <artifactId>mrunit</artifactId>
+            <classifier>hadoop2</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-hadoop2-compat</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-model</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-hcatalog-core</artifactId>
+            <version>${hive-hcatalog.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-testing-util</artifactId>
+            <version>${hbase-hadoop2.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>jsp-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
@@ -57,36 +181,6 @@
 
                 <executions>
                     <execution>
-                        <id>shade-streaming</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <minimizeJar>false</minimizeJar>
-                            <shadedArtifactAttached>true</shadedArtifactAttached>
-                            <shadedClassifierName>streaming</shadedClassifierName>
-                            <filters>
-                                <filter>
-                                    <artifact>*:*</artifact>
-                                    <excludes>
-                                        <exclude>META-INF/*.SF</exclude>
-                                        <exclude>META-INF/*.DSA</exclude>
-                                        <exclude>META-INF/*.RSA</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
-                            <artifactSet>
-                                <excludes>
-                                    <exclude>org.apache.kylin:kylin-invertedindex</exclude>
-                                    <exclude>org.apache.kylin:kylin-engine-mr</exclude>
-                                    <exclude>org.apache.kylin:kylin-source-hive</exclude>
-                                </excludes>
-                            </artifactSet>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>shade-mr</id>
                         <phase>package</phase>
                         <goals>
                             <goal>shade</goal>
@@ -94,7 +188,7 @@
                         <configuration>
                             <minimizeJar>false</minimizeJar>
                             <shadedArtifactAttached>true</shadedArtifactAttached>
-                            <shadedClassifierName>mr</shadedClassifierName>
+                            <shadedClassifierName>job</shadedClassifierName>
                             <filters>
                                 <filter>
                                     <artifact>*:*</artifact>
@@ -105,12 +199,6 @@
                                     </excludes>
                                 </filter>
                             </filters>
-                            <artifactSet>
-                                <excludes>
-                                    <exclude>org.apache.kylin:kylin-invertedindex</exclude>
-                                    <exclude>org.apache.kylin:kylin-engine-streaming</exclude>
-                                </excludes>
-                            </artifactSet>
                         </configuration>
                     </execution>
                 </executions>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java b/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
new file mode 100644
index 0000000..d24cc79
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.MockJobLock;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class BuildCubeWithSparkTest {
+
+    private CubeManager cubeManager;
+    private DefaultScheduler scheduler;
+    protected ExecutableManager jobService;
+
+    private static final Log logger = LogFactory.getLog(BuildCubeWithSparkTest.class);
+
+    protected void waitForJob(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        jobService = ExecutableManager.getInstance(kylinConfig);
+        for (String jobId : jobService.getAllJobIds()) {
+            jobService.deleteJob(jobId);
+        }
+        scheduler = DefaultScheduler.getInstance();
+        scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        cubeManager = CubeManager.getInstance(kylinConfig);
+
+    }
+
+    @After
+    public void after() {
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+        final CubeSegment segment = createSegment();
+        String confPath = new File(AbstractKylinTestCase.SANDBOX_TEST_DATA).getAbsolutePath();
+        KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
+        String coprocessor = KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
+        logger.info("confPath location:" + confPath);
+        logger.info("coprocessor location:" + coprocessor);
+        final DefaultChainedExecutable cubingJob = new SparkBatchCubingEngine(confPath, coprocessor).createBatchCubingJob(segment, "BuildCubeWithSpark");
+        jobService.addJob(cubingJob);
+        waitForJob(cubingJob.getId());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(cubingJob.getId()).getState());
+    }
+
+    private void clearSegment(String cubeName) throws Exception {
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        // remove all existing segments
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+        cubeManager.updateCube(cubeBuilder);
+    }
+
+    private CubeSegment createSegment() throws Exception {
+        String cubeName = "test_kylin_cube_with_slr_left_join_empty";
+        clearSegment(cubeName);
+
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
+        long dateEnd = f.parse("2050-11-12").getTime();
+
+        // this cube's start date is 0, end date is 20501112000000
+        List<String> result = Lists.newArrayList();
+        return cubeManager.appendSegments(cubeManager.getCube(cubeName), dateEnd);
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
new file mode 100644
index 0000000..d7eb3cf
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class BuildCubeWithEngineTest {
+
+    private CubeManager cubeManager;
+    private DefaultScheduler scheduler;
+    protected ExecutableManager jobService;
+
+    private static final Log logger = LogFactory.getLog(BuildCubeWithEngineTest.class);
+
+    protected void waitForJob(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        jobService = ExecutableManager.getInstance(kylinConfig);
+        scheduler = DefaultScheduler.getInstance();
+        scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        cubeManager = CubeManager.getInstance(kylinConfig);
+        for (String jobId : jobService.getAllJobIds()) {
+            if (jobService.getJob(jobId) instanceof CubingJob) {
+                jobService.deleteJob(jobId);
+            }
+        }
+
+    }
+
+    @After
+    public void after() {
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+        DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty");
+        testInner();
+        testLeft();
+    }
+
+    private void testInner() throws Exception {
+        String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", };
+        runTestAndAssertSucceed(testCase);
+    }
+
+    private void testLeft() throws Exception {
+        String[] testCase = new String[] { "testLeftJoinCube", "testLeftJoinCube2", };
+        runTestAndAssertSucceed(testCase);
+    }
+
+    private void runTestAndAssertSucceed(String[] testCase) throws Exception {
+        ExecutorService executorService = Executors.newFixedThreadPool(testCase.length);
+        final CountDownLatch countDownLatch = new CountDownLatch(testCase.length);
+        List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length);
+        for (int i = 0; i < testCase.length; i++) {
+            tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch)));
+        }
+        countDownLatch.await();
+        try {
+            for (int i = 0; i < tasks.size(); ++i) {
+                Future<List<String>> task = tasks.get(i);
+                final List<String> jobIds = task.get();
+                for (String jobId : jobIds) {
+                    assertJobSucceed(jobId);
+                }
+            }
+        } catch (Exception ex) {
+            logger.error(ex);
+            throw ex;
+        }
+    }
+
+    private void assertJobSucceed(String jobId) {
+        assertEquals("The job '" + jobId + "' is failed.", ExecutableState.SUCCEED, jobService.getOutput(jobId).getState());
+    }
+
+    private class TestCallable implements Callable<List<String>> {
+
+        private final String methodName;
+        private final CountDownLatch countDownLatch;
+
+        public TestCallable(String methodName, CountDownLatch countDownLatch) {
+            this.methodName = methodName;
+            this.countDownLatch = countDownLatch;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public List<String> call() throws Exception {
+            try {
+                final Method method = BuildCubeWithEngineTest.class.getDeclaredMethod(methodName);
+                method.setAccessible(true);
+                return (List<String>) method.invoke(BuildCubeWithEngineTest.this);
+            } finally {
+                countDownLatch.countDown();
+            }
+        }
+    }
+
+    @SuppressWarnings("unused")
+    // called by reflection
+    private List<String> testInnerJoinCube2() throws Exception {
+        clearSegment("test_kylin_cube_with_slr_empty");
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        long date1 = 0;
+        long date2 = f.parse("2013-01-01").getTime();
+        long date3 = f.parse("2022-01-01").getTime();
+        List<String> result = Lists.newArrayList();
+        result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2));
+        result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
+        return result;
+    }
+
+    @SuppressWarnings("unused")
+    // called by reflection
+    private List<String> testInnerJoinCube() throws Exception {
+        clearSegment("test_kylin_cube_without_slr_empty");
+
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        // this cube's start date is 0, end date is 20501112000000
+        long date1 = 0;
+        long date2 = f.parse("2050-01-11").getTime();
+
+        // this cube doesn't support incremental build, always do full build
+
+        List<String> result = Lists.newArrayList();
+        result.add(buildSegment("test_kylin_cube_without_slr_empty", date1, date2));
+        return result;
+    }
+
+    @SuppressWarnings("unused")
+    // called by reflection
+    private List<String> testLeftJoinCube2() throws Exception {
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        List<String> result = Lists.newArrayList();
+        final String cubeName = "test_kylin_cube_without_slr_left_join_empty";
+        // this cube's start date is 0, end date is 20120601000000
+        long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
+        long dateEnd = f.parse("2012-06-01").getTime();
+
+        clearSegment(cubeName);
+        result.add(buildSegment(cubeName, dateStart, dateEnd));
+
+        // then submit an append job, start date is 20120601000000, end
+        // date is 20220101000000
+        dateStart = f.parse("2012-06-01").getTime();
+        dateEnd = f.parse("2022-01-01").getTime();
+        result.add(buildSegment(cubeName, dateStart, dateEnd));
+
+        // build an empty segment which doesn't have data
+        dateStart = f.parse("2022-01-01").getTime();
+        dateEnd = f.parse("2023-01-01").getTime();
+        result.add(buildSegment(cubeName, dateStart, dateEnd));
+
+        return result;
+
+    }
+
+    @SuppressWarnings("unused")
+    // called by reflection
+    private List<String> testLeftJoinCube() throws Exception {
+        String cubeName = "test_kylin_cube_with_slr_left_join_empty";
+        clearSegment(cubeName);
+
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
+        long dateEnd = f.parse("2050-11-12").getTime();
+
+        // this cube's start date is 0, end date is 20501112000000
+        List<String> result = Lists.newArrayList();
+        result.add(buildSegment(cubeName, dateStart, dateEnd));
+        return result;
+
+    }
+
+    private void clearSegment(String cubeName) throws Exception {
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        // remove all existing segments
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+        cubeManager.updateCube(cubeBuilder);
+    }
+
+    private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {
+        CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube(cubeName), endDate);
+        DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
+        jobService.addJob(job);
+        waitForJob(job.getId());
+        return job.getId();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
new file mode 100644
index 0000000..b02b2f2
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -0,0 +1,132 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.job.streaming.BootstrapConfig;
+import org.apache.kylin.job.streaming.StreamingBootstrap;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
+import org.apache.kylin.streaming.StreamingConfig;
+import org.apache.kylin.streaming.StreamingManager;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  for streaming cubing case "test_streaming_table"
+ */
+public class BuildCubeWithStreamTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamTest.class);
+    private static final String streamingName = "test_streaming_table_cube";
+    private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00");
+    private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00");
+    private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours
+
+    private KylinConfig kylinConfig;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        DeployUtil.overrideJobJarLocations();
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+
+        //Use a random toplic for kafka data stream
+        StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingName);
+        streamingConfig.setTopic(UUID.randomUUID().toString());
+        StreamingManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
+
+        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, streamingConfig);
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        backup();
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    private static int cleanupOldStorage() throws Exception {
+        String[] args = { "--delete", "true" };
+        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+        return exitCode;
+    }
+
+    private static void backup() throws Exception {
+        int exitCode = cleanupOldStorage();
+        if (exitCode == 0) {
+            exportHBaseData();
+        }
+    }
+
+    private static void exportHBaseData() throws IOException {
+        ExportHBaseData export = new ExportHBaseData();
+        export.exportTables();
+        export.tearDown();
+    }
+
+    @Test
+    public void test() throws Exception {
+        for (long start = startTime; start < endTime; start += batchInterval) {
+            BootstrapConfig bootstrapConfig = new BootstrapConfig();
+            bootstrapConfig.setStart(start);
+            bootstrapConfig.setEnd(start + batchInterval);
+            bootstrapConfig.setOneOff(true);
+            bootstrapConfig.setPartitionId(0);
+            bootstrapConfig.setStreaming(streamingName);
+            StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(bootstrapConfig);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
new file mode 100644
index 0000000..fecb106
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.invertedindex.IIJob;
+import org.apache.kylin.job.invertedindex.IIJobBuilder;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
+import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author shaoshi
+ */
+public class BuildIIWithEngineTest {
+
+    private JobEngineConfig jobEngineConfig;
+    private IIManager iiManager;
+
+    private DefaultScheduler scheduler;
+    protected ExecutableManager jobService;
+
+    protected static final String[] TEST_II_INSTANCES = new String[] { "test_kylin_ii_inner_join", "test_kylin_ii_left_join" };
+
+    private static final Log logger = LogFactory.getLog(BuildIIWithEngineTest.class);
+
+    protected void waitForJob(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        //DeployUtil.initCliWorkDir();
+        //        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        jobService = ExecutableManager.getInstance(kylinConfig);
+        scheduler = DefaultScheduler.getInstance();
+        scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        jobEngineConfig = new JobEngineConfig(kylinConfig);
+        for (String jobId : jobService.getAllJobIds()) {
+            if (jobService.getJob(jobId) instanceof IIJob) {
+                jobService.deleteJob(jobId);
+            }
+        }
+
+        iiManager = IIManager.getInstance(kylinConfig);
+        for (String iiInstance : TEST_II_INSTANCES) {
+
+            IIInstance ii = iiManager.getII(iiInstance);
+            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
+                ii.setStatus(RealizationStatusEnum.DISABLED);
+                iiManager.updateII(ii);
+            }
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+
+        for (String iiInstance : TEST_II_INSTANCES) {
+            IIInstance ii = iiManager.getII(iiInstance);
+            if (ii.getStatus() != RealizationStatusEnum.READY) {
+                ii.setStatus(RealizationStatusEnum.READY);
+                iiManager.updateII(ii);
+            }
+        }
+    }
+
+    @Test
+    @Ignore
+    public void testBuildII() throws Exception {
+
+        String[] testCase = new String[] { "buildIIInnerJoin", "buildIILeftJoin" };
+        ExecutorService executorService = Executors.newFixedThreadPool(testCase.length);
+        final CountDownLatch countDownLatch = new CountDownLatch(testCase.length);
+        List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length);
+        for (int i = 0; i < testCase.length; i++) {
+            tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch)));
+        }
+        countDownLatch.await();
+        for (int i = 0; i < tasks.size(); ++i) {
+            Future<List<String>> task = tasks.get(i);
+            final List<String> jobIds = task.get();
+            for (String jobId : jobIds) {
+                assertJobSucceed(jobId);
+            }
+        }
+
+    }
+
+    private void assertJobSucceed(String jobId) {
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(jobId).getState());
+    }
+
+    private class TestCallable implements Callable<List<String>> {
+
+        private final String methodName;
+        private final CountDownLatch countDownLatch;
+
+        public TestCallable(String methodName, CountDownLatch countDownLatch) {
+            this.methodName = methodName;
+            this.countDownLatch = countDownLatch;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public List<String> call() throws Exception {
+            try {
+                final Method method = BuildIIWithEngineTest.class.getDeclaredMethod(methodName);
+                method.setAccessible(true);
+                return (List<String>) method.invoke(BuildIIWithEngineTest.this);
+            } finally {
+                countDownLatch.countDown();
+            }
+        }
+    }
+
+    protected List<String> buildIIInnerJoin() throws Exception {
+        return buildII(TEST_II_INSTANCES[0]);
+    }
+
+    protected List<String> buildIILeftJoin() throws Exception {
+        return buildII(TEST_II_INSTANCES[1]);
+    }
+
+    protected List<String> buildII(String iiName) throws Exception {
+        clearSegment(iiName);
+
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        long date1 = 0;
+        long date2 = f.parse("2015-01-01").getTime();
+
+        List<String> result = Lists.newArrayList();
+        result.add(buildSegment(iiName, date1, date2));
+        return result;
+    }
+
+    private void clearSegment(String iiName) throws Exception {
+        IIInstance ii = iiManager.getII(iiName);
+        ii.getSegments().clear();
+        iiManager.updateII(ii);
+    }
+
+    private String buildSegment(String iiName, long startDate, long endDate) throws Exception {
+        IIInstance iiInstance = iiManager.getII(iiName);
+        IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
+        iiInstance.getSegments().add(segment);
+        iiManager.updateII(iiInstance);
+        IIJobBuilder iiJobBuilder = new IIJobBuilder(jobEngineConfig);
+        IIJob job = iiJobBuilder.buildJob(segment, "TEST");
+        jobService.addJob(job);
+        waitForJob(job.getId());
+        return job.getId();
+    }
+
+    private int cleanupOldStorage() throws Exception {
+        String[] args = { "--delete", "true" };
+
+        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+        return exitCode;
+    }
+
+    public static void main(String[] args) throws Exception {
+        BuildIIWithEngineTest instance = new BuildIIWithEngineTest();
+
+        BuildIIWithEngineTest.beforeClass();
+        instance.before();
+        instance.testBuildII();
+        instance.after();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
new file mode 100644
index 0000000..5ca3b29
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -0,0 +1,248 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job;
+
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.source.hive.HiveTableReader;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.apache.kylin.streaming.StreamBuilder;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class BuildIIWithStreamTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStreamTest.class);
+
+    private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
+    private IIManager iiManager;
+    private KylinConfig kylinConfig;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+        DeployUtil.overrideJobJarLocations();
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        iiManager = IIManager.getInstance(kylinConfig);
+        iiManager = IIManager.getInstance(kylinConfig);
+        for (String iiInstance : II_NAME) {
+
+            IIInstance ii = iiManager.getII(iiInstance);
+            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
+                ii.setStatus(RealizationStatusEnum.DISABLED);
+                iiManager.updateII(ii);
+            }
+        }
+    }
+
+    private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException {
+        IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
+        JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
+        final String uuid = UUID.randomUUID().toString();
+        final String useDatabaseHql = "USE " + kylinConfig.getHiveDatabaseForIntermediateTable() + ";";
+        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
+        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid));
+        String insertDataHqls;
+        try {
+            insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobEngineConfig);
+        } catch (IOException e1) {
+            e1.printStackTrace();
+            throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
+        }
+
+        ShellExecutable step = new ShellExecutable();
+        StringBuffer buf = new StringBuffer();
+        buf.append("hive -e \"");
+        buf.append(useDatabaseHql + "\n");
+        buf.append(dropTableHql + "\n");
+        buf.append(createTableHql + "\n");
+        buf.append(insertDataHqls + "\n");
+        buf.append("\"");
+
+        step.setCmd(buf.toString());
+        logger.info(step.getCmd());
+        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+        kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
+        return intermediateTableDesc.getTableName();
+    }
+
+    private void clearSegment(String iiName) throws Exception {
+        IIInstance ii = iiManager.getII(iiName);
+        ii.getSegments().clear();
+        iiManager.updateII(ii);
+    }
+
+    private IISegment createSegment(String iiName) throws Exception {
+        clearSegment(iiName);
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        long date1 = 0;
+        long date2 = f.parse("2015-01-01").getTime();
+        return buildSegment(iiName, date1, date2);
+    }
+
+    private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception {
+        IIInstance iiInstance = iiManager.getII(iiName);
+        IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
+        iiInstance.getSegments().add(segment);
+        iiManager.updateII(iiInstance);
+        return segment;
+    }
+
+    private void buildII(String iiName) throws Exception {
+        final IIDesc desc = iiManager.getII(iiName).getDescriptor();
+        final String tableName = createIntermediateTable(desc, kylinConfig);
+        logger.info("intermediate table name:" + tableName);
+
+        HiveTableReader reader = new HiveTableReader("default", tableName);
+        final List<TblColRef> tblColRefs = desc.listAllColumns();
+        for (TblColRef tblColRef : tblColRefs) {
+            if (desc.isMetricsCol(tblColRef)) {
+                logger.info("matrix:" + tblColRef.getName());
+            } else {
+                logger.info("measure:" + tblColRef.getName());
+            }
+        }
+        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>();
+        final IISegment segment = createSegment(iiName);
+        String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
+        ToolRunner.run(new IICreateHTableJob(), args);
+
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        final StreamBuilder streamBuilder = StreamBuilder.newLimitedSizeStreamBuilder(iiName, queue, new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0), 0, segment.getIIDesc().getSliceSize());
+
+        List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
+        int count = sorted.size();
+        for (String[] row : sorted) {
+            logger.info("another row: " + StringUtils.join(row, ","));
+            queue.put(parse(row));
+        }
+
+        reader.close();
+        logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
+        queue.put(StreamMessage.EOF);
+        final Future<?> future = executorService.submit(streamBuilder);
+        try {
+            future.get();
+        } catch (Exception e) {
+            logger.error("stream build failed", e);
+            fail("stream build failed");
+        }
+
+        logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
+    }
+
+    @Test
+    public void test() throws Exception {
+        for (String iiName : II_NAME) {
+            buildII(iiName);
+            IIInstance ii = iiManager.getII(iiName);
+            if (ii.getStatus() != RealizationStatusEnum.READY) {
+                ii.setStatus(RealizationStatusEnum.READY);
+                iiManager.updateII(ii);
+            }
+        }
+    }
+
+    private StreamMessage parse(String[] row) {
+        return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
+    }
+
+    private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {
+        List<String[]> unsorted = Lists.newArrayList();
+        while (reader.next()) {
+            unsorted.add(reader.getRow());
+        }
+        Collections.sort(unsorted, new Comparator<String[]>() {
+            @Override
+            public int compare(String[] o1, String[] o2) {
+                long t1 = DateFormat.stringToMillis(o1[tsCol]);
+                long t2 = DateFormat.stringToMillis(o2[tsCol]);
+                return Long.compare(t1, t2);
+            }
+        });
+        return unsorted;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java b/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java
new file mode 100644
index 0000000..5c01305
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/DataGenTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.job.dataGen.FactTableGenerator;
+import org.apache.kylin.metadata.MetadataManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class DataGenTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void before() throws Exception {
+        this.createTestMetadata();
+        MetadataManager.clearCache();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testBasics() throws Exception {
+        String content = FactTableGenerator.generate("test_kylin_cube_with_slr_ready", "10000", "1", null);// default  settings
+        System.out.println(content);
+        assertTrue(content.contains("FP-non GTC"));
+        assertTrue(content.contains("ABIN"));
+
+        DeployUtil.overrideFactTableData(content, "default.test_kylin_fact");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java b/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
new file mode 100644
index 0000000..7f12069
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This test case is ONLY for dev use, it deploys local meta to sandbox
+ */
+@Ignore("dev use only")
+public class DeployLocalMetaToRemoteTest {
+
+    private static final Log logger = LogFactory.getLog(DeployLocalMetaToRemoteTest.class);
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+
+    }
+
+    @After
+    public void after() {
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+        System.out.println("blank");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
new file mode 100644
index 0000000..045f608
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceTool;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.job.dataGen.FactTableGenerator;
+import org.apache.kylin.job.streaming.KafkaDataLoader;
+import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.hive.HiveClient;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.StreamingConfig;
+import org.apache.kylin.streaming.TimedJsonStreamParser;
+import org.apache.maven.model.Model;
+import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class DeployUtil {
+    private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
+
+    public static void initCliWorkDir() throws IOException {
+        execCliCommand("rm -rf " + getHadoopCliWorkingDir());
+        execCliCommand("mkdir -p " + config().getKylinJobLogDir());
+    }
+
+    public static void deployMetadata() throws IOException {
+        // install metadata to hbase
+        ResourceTool.reset(config());
+        ResourceTool.copy(KylinConfig.createInstanceFromUri(AbstractKylinTestCase.LOCALMETA_TEST_DATA), config());
+
+        // update cube desc signature.
+        for (CubeInstance cube : CubeManager.getInstance(config()).listAllCubes()) {
+            cube.getDescriptor().setSignature(cube.getDescriptor().calculateSignature());
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            CubeManager.getInstance(config()).updateCube(cubeBuilder);
+        }
+    }
+
+    public static void overrideJobJarLocations() {
+        File jobJar = getJobJarFile();
+        File coprocessorJar = getCoprocessorJarFile();
+
+        config().overrideMRJobJarPath(jobJar.getAbsolutePath());
+        config().overrideCoprocessorLocalJar(coprocessorJar.getAbsolutePath());
+        config().overrideSparkJobJarPath(getSparkJobJarFile().getAbsolutePath());
+    }
+
+    private static String getPomVersion() {
+        try {
+            MavenXpp3Reader pomReader = new MavenXpp3Reader();
+            Model model = pomReader.read(new FileReader("../pom.xml"));
+            return model.getVersion();
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    private static File getJobJarFile() {
+        return new File("../assembly/target", "kylin-job-" + getPomVersion() + "-job.jar");
+    }
+
+    private static File getCoprocessorJarFile() {
+        return new File("../storage-hbase/target", "kylin-storage-hbase-" + getPomVersion() + "-coprocessor.jar");
+    }
+
+    private static File getSparkJobJarFile() {
+        return new File("../engine-spark/target", "kylin-engine-spark-" + getPomVersion() + "-job.jar");
+    }
+
+    private static void execCliCommand(String cmd) throws IOException {
+        config().getCliCommandExecutor().execute(cmd);
+    }
+
+    private static String getHadoopCliWorkingDir() {
+        return config().getCliWorkingDir();
+    }
+
+    private static KylinConfig config() {
+        return KylinConfig.getInstanceFromEnv();
+    }
+
+    // ============================================================================
+
+    static final String TABLE_CAL_DT = "edw.test_cal_dt";
+    static final String TABLE_CATEGORY_GROUPINGS = "default.test_category_groupings";
+    static final String TABLE_KYLIN_FACT = "default.test_kylin_fact";
+    static final String TABLE_SELLER_TYPE_DIM = "edw.test_seller_type_dim";
+    static final String TABLE_SITES = "edw.test_sites";
+
+    static final String[] TABLE_NAMES = new String[] { TABLE_CAL_DT, TABLE_CATEGORY_GROUPINGS, TABLE_KYLIN_FACT, TABLE_SELLER_TYPE_DIM, TABLE_SITES };
+
+    public static void prepareTestDataForNormalCubes(String cubeName) throws Exception {
+
+        String factTableName = TABLE_KYLIN_FACT.toUpperCase();
+        String content = null;
+
+        boolean buildCubeUsingProvidedData = Boolean.parseBoolean(System.getProperty("buildCubeUsingProvidedData"));
+        if (!buildCubeUsingProvidedData) {
+            System.out.println("build cube with random dataset");
+            // data is generated according to cube descriptor and saved in resource store
+            content = FactTableGenerator.generate(cubeName, "10000", "0.6", null);
+            assert content != null;
+            overrideFactTableData(content, factTableName);
+        } else {
+            System.out.println("build normal cubes with provided dataset");
+        }
+
+        deployHiveTables();
+    }
+
+    public static void prepareTestDataForStreamingCube(long startTime, long endTime, StreamingConfig streamingConfig) throws IOException {
+        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(streamingConfig.getCubeName());
+        List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
+        TableDesc tableDesc = cubeInstance.getFactTableDesc();
+
+        //load into kafka
+        KafkaDataLoader.loadIntoKafka(streamingConfig, data);
+        logger.info("Write {} messages into topic {}", data.size(), streamingConfig.getTopic());
+
+        //csv data for H2 use
+        List<TblColRef> tableColumns = Lists.newArrayList();
+        for (ColumnDesc columnDesc : tableDesc.getColumns()) {
+            tableColumns.add(new TblColRef(columnDesc));
+        }
+        TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
+        StringBuilder sb = new StringBuilder();
+        for (String json : data) {
+            List<String> rowColumns = timedJsonStreamParser.parse(new StreamMessage(0, json.getBytes())).getStreamMessage();
+            sb.append(StringUtils.join(rowColumns, ","));
+            sb.append(System.getProperty("line.separator"));
+        }
+        overrideFactTableData(sb.toString(), cubeInstance.getFactTable());
+    }
+
+    public static void overrideFactTableData(String factTableContent, String factTableName) throws IOException {
+        // Write to resource store
+        ResourceStore store = ResourceStore.getStore(config());
+
+        InputStream in = new ByteArrayInputStream(factTableContent.getBytes("UTF-8"));
+        String factTablePath = "/data/" + factTableName + ".csv";
+        store.deleteResource(factTablePath);
+        store.putResource(factTablePath, in, System.currentTimeMillis());
+        in.close();
+    }
+
+    private static void deployHiveTables() throws Exception {
+
+        MetadataManager metaMgr = MetadataManager.getInstance(config());
+
+        // scp data files, use the data from hbase, instead of local files
+        File temp = File.createTempFile("temp", ".csv");
+        temp.createNewFile();
+        for (String tablename : TABLE_NAMES) {
+            tablename = tablename.toUpperCase();
+
+            File localBufferFile = new File(temp.getParent() + "/" + tablename + ".csv");
+            localBufferFile.createNewFile();
+
+            InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv");
+            FileOutputStream localFileStream = new FileOutputStream(localBufferFile);
+            IOUtils.copy(hbaseDataStream, localFileStream);
+
+            hbaseDataStream.close();
+            localFileStream.close();
+
+            localBufferFile.deleteOnExit();
+        }
+        String tableFileDir = temp.getParent();
+        temp.delete();
+
+        HiveClient hiveClient = new HiveClient();
+
+        // create hive tables
+        hiveClient.executeHQL("CREATE DATABASE IF NOT EXISTS EDW");
+        hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CAL_DT.toUpperCase())));
+        hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CATEGORY_GROUPINGS.toUpperCase())));
+        hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_KYLIN_FACT.toUpperCase())));
+        hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_SELLER_TYPE_DIM.toUpperCase())));
+        hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_SITES.toUpperCase())));
+
+        // load data to hive tables
+        // LOAD DATA LOCAL INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
+        hiveClient.executeHQL(generateLoadDataHql(TABLE_CAL_DT, tableFileDir));
+        hiveClient.executeHQL(generateLoadDataHql(TABLE_CATEGORY_GROUPINGS, tableFileDir));
+        hiveClient.executeHQL(generateLoadDataHql(TABLE_KYLIN_FACT, tableFileDir));
+        hiveClient.executeHQL(generateLoadDataHql(TABLE_SELLER_TYPE_DIM, tableFileDir));
+        hiveClient.executeHQL(generateLoadDataHql(TABLE_SITES, tableFileDir));
+    }
+
+    private static String generateLoadDataHql(String tableName, String tableFileDir) {
+        return "LOAD DATA LOCAL INPATH '" + tableFileDir + "/" + tableName.toUpperCase() + ".csv' OVERWRITE INTO TABLE " + tableName.toUpperCase();
+    }
+
+    private static String[] generateCreateTableHql(TableDesc tableDesc) {
+
+        String dropsql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity();
+        StringBuilder ddl = new StringBuilder();
+
+        ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
+        ddl.append("(" + "\n");
+
+        for (int i = 0; i < tableDesc.getColumns().length; i++) {
+            ColumnDesc col = tableDesc.getColumns()[i];
+            if (i > 0) {
+                ddl.append(",");
+            }
+            ddl.append(col.getName() + " " + getHiveDataType((col.getDatatype())) + "\n");
+        }
+
+        ddl.append(")" + "\n");
+        ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n");
+        ddl.append("STORED AS TEXTFILE");
+
+        return new String[] { dropsql, ddl.toString() };
+    }
+
+    private static String getHiveDataType(String javaDataType) {
+        String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType;
+        hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType;
+
+        return hiveDataType.toLowerCase();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/assembly/src/test/java/org/apache/kylin/job/ExportHBaseData.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/ExportHBaseData.java b/assembly/src/test/java/org/apache/kylin/job/ExportHBaseData.java
new file mode 100644
index 0000000..5eb7485
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/ExportHBaseData.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.SSHClient;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.steps.HBaseMiniclusterHelper;
+
+public class ExportHBaseData {
+
+    KylinConfig kylinConfig;
+    HTableDescriptor[] allTables;
+    Configuration config;
+    HBaseAdmin hbase;
+    CliCommandExecutor cli;
+    String exportHdfsFolder;
+    String exportLocalFolderParent;
+    String exportLocalFolder;
+    String backupArchive;
+    String tableNameBase;
+    long currentTIME;
+
+    public ExportHBaseData() {
+        try {
+            setup();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void setup() throws IOException {
+
+        KylinConfig.destoryInstance();
+        System.setProperty(KylinConfig.KYLIN_CONF, AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        cli = kylinConfig.getCliCommandExecutor();
+
+        currentTIME = System.currentTimeMillis();
+        exportHdfsFolder = kylinConfig.getHdfsWorkingDirectory() + "hbase-export/" + currentTIME + "/";
+        exportLocalFolderParent = BatchConstants.CFG_KYLIN_LOCAL_TEMP_DIR + "hbase-export/";
+        exportLocalFolder = exportLocalFolderParent + currentTIME + "/";
+        backupArchive = exportLocalFolderParent + "hbase-export-at-" + currentTIME + ".tar.gz";
+
+        String metadataUrl = kylinConfig.getMetadataUrl();
+        // split TABLE@HBASE_URL
+        int cut = metadataUrl.indexOf('@');
+        tableNameBase = metadataUrl.substring(0, cut);
+        String hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
+
+        HConnection conn = HBaseConnection.get(hbaseUrl);
+        try {
+            hbase = new HBaseAdmin(conn);
+            config = hbase.getConfiguration();
+            allTables = hbase.listTables();
+        } catch (IOException e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    public void tearDown() {
+
+        // cleanup hdfs
+        try {
+            if (cli != null && exportHdfsFolder != null) {
+                cli.execute("hadoop fs -rm -r " + exportHdfsFolder);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        // cleanup sandbox disk
+        try {
+            if (cli != null && exportLocalFolder != null) {
+                cli.execute("rm -r " + exportLocalFolder);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        // delete archive file on sandbox
+        try {
+            if (cli != null && backupArchive != null) {
+                cli.execute("rm " + backupArchive);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void exportTables() throws IOException {
+        cli.execute("mkdir -p " + exportLocalFolderParent);
+
+        for (HTableDescriptor table : allTables) {
+            String tName = table.getNameAsString();
+            if (!tName.equals(tableNameBase) && !tName.startsWith(HBaseMiniclusterHelper.SHARED_STORAGE_PREFIX))
+                continue;
+
+            cli.execute("hbase org.apache.hadoop.hbase.mapreduce.Export " + tName + " " + exportHdfsFolder + tName);
+        }
+
+        cli.execute("hadoop fs -copyToLocal " + exportHdfsFolder + " " + exportLocalFolderParent);
+        cli.execute("tar -zcvf " + backupArchive + " --directory=" + exportLocalFolderParent + " " + currentTIME);
+        downloadToLocal();
+    }
+
+    public void downloadToLocal() throws IOException {
+        String localArchive = "../examples/test_case_data/minicluster/hbase-export.tar.gz";
+
+        if (kylinConfig.getRunAsRemoteCommand()) {
+            SSHClient ssh = new SSHClient(kylinConfig.getRemoteHadoopCliHostname(), kylinConfig.getRemoteHadoopCliPort(), kylinConfig.getRemoteHadoopCliUsername(), kylinConfig.getRemoteHadoopCliPassword());
+            try {
+                ssh.scpFileToLocal(backupArchive, localArchive);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        } else {
+            FileUtils.copyFile(new File(backupArchive), new File(localArchive));
+        }
+    }
+
+    public static void main(String[] args) {
+        ExportHBaseData export = new ExportHBaseData();
+        try {
+            export.exportTables();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            export.tearDown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
new file mode 100644
index 0000000..6a615cb
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
@@ -0,0 +1,85 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.job;
+
+import java.io.File;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.job.streaming.StreamingBootstrap;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+@Ignore("this test case will break existing metadata store")
+public class ITKafkaBasedIIStreamBuilderTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(ITKafkaBasedIIStreamBuilderTest.class);
+
+    private KylinConfig kylinConfig;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
+    }
+
+    @Before
+    public void before() throws Exception {
+        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+    }
+
+    @Test
+    public void test() throws Exception {
+        final StreamingBootstrap bootstrap = StreamingBootstrap.getInstance(kylinConfig);
+        bootstrap.start("eagle", 0);
+        Thread.sleep(30 * 60 * 1000);
+        logger.info("time is up, stop streaming");
+        bootstrap.stop();
+        Thread.sleep(5 * 1000);
+    }
+}