You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/19 02:27:53 UTC

[04/12] incubator-kylin git commit: KYLIN-1010 Decompose project job

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
deleted file mode 100644
index 8c92f87..0000000
--- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.ResourceTool;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.job.dataGen.FactTableGenerator;
-import org.apache.kylin.job.streaming.KafkaDataLoader;
-import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.hive.HiveClient;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.TimedJsonStreamParser;
-import org.apache.maven.model.Model;
-import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-public class DeployUtil {
-    @SuppressWarnings("unused")
-    private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
-
-    public static void initCliWorkDir() throws IOException {
-        execCliCommand("rm -rf " + getHadoopCliWorkingDir());
-        execCliCommand("mkdir -p " + config().getKylinJobLogDir());
-    }
-
-    public static void deployMetadata() throws IOException {
-        // install metadata to hbase
-        ResourceTool.reset(config());
-        ResourceTool.copy(KylinConfig.createInstanceFromUri(AbstractKylinTestCase.LOCALMETA_TEST_DATA), config());
-
-        // update cube desc signature.
-        for (CubeInstance cube : CubeManager.getInstance(config()).listAllCubes()) {
-            cube.getDescriptor().setSignature(cube.getDescriptor().calculateSignature());
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            CubeManager.getInstance(config()).updateCube(cubeBuilder);
-        }
-    }
-
-    public static void overrideJobJarLocations() {
-        File jobJar = getJobJarFile();
-        File coprocessorJar = getCoprocessorJarFile();
-
-        config().overrideMRJobJarPath(jobJar.getAbsolutePath());
-        config().overrideCoprocessorLocalJar(coprocessorJar.getAbsolutePath());
-        config().overrideSparkJobJarPath(getSparkJobJarFile().getAbsolutePath());
-    }
-
-    private static String getPomVersion() {
-        try {
-            MavenXpp3Reader pomReader = new MavenXpp3Reader();
-            Model model = pomReader.read(new FileReader("../pom.xml"));
-            return model.getVersion();
-        } catch (Exception e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
-    }
-
-    private static File getJobJarFile() {
-        return new File("../job/target", "kylin-job-" + getPomVersion() + "-job.jar");
-    }
-
-    private static File getCoprocessorJarFile() {
-        return new File("../storage-hbase/target", "kylin-storage-hbase-" + getPomVersion() + "-coprocessor.jar");
-    }
-
-    private static File getSparkJobJarFile() {
-        return new File("../engine-spark/target", "kylin-engine-spark-" + getPomVersion() + "-job.jar");
-    }
-
-    private static void execCliCommand(String cmd) throws IOException {
-        config().getCliCommandExecutor().execute(cmd);
-    }
-
-    private static String getHadoopCliWorkingDir() {
-        return config().getCliWorkingDir();
-    }
-
-    private static KylinConfig config() {
-        return KylinConfig.getInstanceFromEnv();
-    }
-
-    // ============================================================================
-
-    static final String TABLE_CAL_DT = "edw.test_cal_dt";
-    static final String TABLE_CATEGORY_GROUPINGS = "default.test_category_groupings";
-    static final String TABLE_KYLIN_FACT = "default.test_kylin_fact";
-    static final String TABLE_SELLER_TYPE_DIM = "edw.test_seller_type_dim";
-    static final String TABLE_SITES = "edw.test_sites";
-
-    static final String[] TABLE_NAMES = new String[] { TABLE_CAL_DT, TABLE_CATEGORY_GROUPINGS, TABLE_KYLIN_FACT, TABLE_SELLER_TYPE_DIM, TABLE_SITES };
-
-    public static void prepareTestDataForNormalCubes(String cubeName) throws Exception {
-
-        String factTableName = TABLE_KYLIN_FACT.toUpperCase();
-        String content = null;
-
-        boolean buildCubeUsingProvidedData = Boolean.parseBoolean(System.getProperty("buildCubeUsingProvidedData"));
-        if (!buildCubeUsingProvidedData) {
-            System.out.println("build cube with random dataset");
-            // data is generated according to cube descriptor and saved in resource store
-            content = FactTableGenerator.generate(cubeName, "10000", "0.6", null);
-            assert content != null;
-            overrideFactTableData(content, factTableName);
-        } else {
-            System.out.println("build normal cubes with provided dataset");
-        }
-
-        deployHiveTables();
-    }
-
-    public static void prepareTestDataForStreamingCube(long startTime, long endTime, StreamingConfig streamingConfig) throws IOException {
-        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(streamingConfig.getCubeName());
-        List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
-        TableDesc tableDesc = cubeInstance.getFactTableDesc();
-
-        //load into kafka
-        KafkaDataLoader.loadIntoKafka(streamingConfig, data);
-        logger.info("Write {} messages into topic {}", data.size(), streamingConfig.getTopic());
-
-        //csv data for H2 use
-        List<TblColRef> tableColumns = Lists.newArrayList();
-        for (ColumnDesc columnDesc : tableDesc.getColumns()) {
-            tableColumns.add(new TblColRef(columnDesc));
-        }
-        TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
-        StringBuilder sb = new StringBuilder();
-        for (String json : data) {
-            List<String> rowColumns = timedJsonStreamParser.parse(new StreamMessage(0, json.getBytes())).getStreamMessage();
-            sb.append(StringUtils.join(rowColumns, ","));
-            sb.append(System.getProperty("line.separator"));
-        }
-        overrideFactTableData(sb.toString(), cubeInstance.getFactTable());
-    }
-
-    public static void overrideFactTableData(String factTableContent, String factTableName) throws IOException {
-        // Write to resource store
-        ResourceStore store = ResourceStore.getStore(config());
-
-        InputStream in = new ByteArrayInputStream(factTableContent.getBytes("UTF-8"));
-        String factTablePath = "/data/" + factTableName + ".csv";
-        store.deleteResource(factTablePath);
-        store.putResource(factTablePath, in, System.currentTimeMillis());
-        in.close();
-    }
-
-    private static void deployHiveTables() throws Exception {
-
-        MetadataManager metaMgr = MetadataManager.getInstance(config());
-
-        // scp data files, use the data from hbase, instead of local files
-        File temp = File.createTempFile("temp", ".csv");
-        temp.createNewFile();
-        for (String tablename : TABLE_NAMES) {
-            tablename = tablename.toUpperCase();
-
-            File localBufferFile = new File(temp.getParent() + "/" + tablename + ".csv");
-            localBufferFile.createNewFile();
-
-            InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv");
-            FileOutputStream localFileStream = new FileOutputStream(localBufferFile);
-            IOUtils.copy(hbaseDataStream, localFileStream);
-
-            hbaseDataStream.close();
-            localFileStream.close();
-
-            localBufferFile.deleteOnExit();
-        }
-        String tableFileDir = temp.getParent();
-        temp.delete();
-
-        HiveClient hiveClient = new HiveClient();
-
-        // create hive tables
-        hiveClient.executeHQL("CREATE DATABASE IF NOT EXISTS EDW");
-        hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CAL_DT.toUpperCase())));
-        hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CATEGORY_GROUPINGS.toUpperCase())));
-        hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_KYLIN_FACT.toUpperCase())));
-        hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_SELLER_TYPE_DIM.toUpperCase())));
-        hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_SITES.toUpperCase())));
-
-        // load data to hive tables
-        // LOAD DATA LOCAL INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
-        hiveClient.executeHQL(generateLoadDataHql(TABLE_CAL_DT, tableFileDir));
-        hiveClient.executeHQL(generateLoadDataHql(TABLE_CATEGORY_GROUPINGS, tableFileDir));
-        hiveClient.executeHQL(generateLoadDataHql(TABLE_KYLIN_FACT, tableFileDir));
-        hiveClient.executeHQL(generateLoadDataHql(TABLE_SELLER_TYPE_DIM, tableFileDir));
-        hiveClient.executeHQL(generateLoadDataHql(TABLE_SITES, tableFileDir));
-    }
-
-    private static String generateLoadDataHql(String tableName, String tableFileDir) {
-        return "LOAD DATA LOCAL INPATH '" + tableFileDir + "/" + tableName.toUpperCase() + ".csv' OVERWRITE INTO TABLE " + tableName.toUpperCase();
-    }
-
-    private static String[] generateCreateTableHql(TableDesc tableDesc) {
-
-        String dropsql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity();
-        StringBuilder ddl = new StringBuilder();
-
-        ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
-        ddl.append("(" + "\n");
-
-        for (int i = 0; i < tableDesc.getColumns().length; i++) {
-            ColumnDesc col = tableDesc.getColumns()[i];
-            if (i > 0) {
-                ddl.append(",");
-            }
-            ddl.append(col.getName() + " " + getHiveDataType((col.getDatatype())) + "\n");
-        }
-
-        ddl.append(")" + "\n");
-        ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n");
-        ddl.append("STORED AS TEXTFILE");
-
-        return new String[] { dropsql, ddl.toString() };
-    }
-
-    private static String getHiveDataType(String javaDataType) {
-        String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType;
-        hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType;
-
-        return hiveDataType.toLowerCase();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
deleted file mode 100644
index 2d8bb05..0000000
--- a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.SSHClient;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.storage.hbase.steps.HBaseConnection;
-import org.apache.kylin.storage.hbase.steps.HBaseMiniclusterHelper;
-
-public class ExportHBaseData {
-
-    KylinConfig kylinConfig;
-    HTableDescriptor[] allTables;
-    Configuration config;
-    HBaseAdmin hbase;
-    CliCommandExecutor cli;
-    String exportHdfsFolder;
-    String exportLocalFolderParent;
-    String exportLocalFolder;
-    String backupArchive;
-    String tableNameBase;
-    long currentTIME;
-
-    public ExportHBaseData() {
-        try {
-            setup();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void setup() throws IOException {
-
-        KylinConfig.destoryInstance();
-        System.setProperty(KylinConfig.KYLIN_CONF, AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        cli = kylinConfig.getCliCommandExecutor();
-
-        currentTIME = System.currentTimeMillis();
-        exportHdfsFolder = kylinConfig.getHdfsWorkingDirectory() + "hbase-export/" + currentTIME + "/";
-        exportLocalFolderParent = BatchConstants.CFG_KYLIN_LOCAL_TEMP_DIR + "hbase-export/";
-        exportLocalFolder = exportLocalFolderParent + currentTIME + "/";
-        backupArchive = exportLocalFolderParent + "hbase-export-at-" + currentTIME + ".tar.gz";
-
-        String metadataUrl = kylinConfig.getMetadataUrl();
-        // split TABLE@HBASE_URL
-        int cut = metadataUrl.indexOf('@');
-        tableNameBase = metadataUrl.substring(0, cut);
-        String hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
-
-        HConnection conn = HBaseConnection.get(hbaseUrl);
-        try {
-            hbase = new HBaseAdmin(conn);
-            config = hbase.getConfiguration();
-            allTables = hbase.listTables();
-        } catch (IOException e) {
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    public void tearDown() {
-
-        // cleanup hdfs
-        try {
-            if (cli != null && exportHdfsFolder != null) {
-                cli.execute("hadoop fs -rm -r " + exportHdfsFolder);
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        // cleanup sandbox disk
-        try {
-            if (cli != null && exportLocalFolder != null) {
-                cli.execute("rm -r " + exportLocalFolder);
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        // delete archive file on sandbox
-        try {
-            if (cli != null && backupArchive != null) {
-                cli.execute("rm " + backupArchive);
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public void exportTables() throws IOException {
-        cli.execute("mkdir -p " + exportLocalFolderParent);
-
-        for (HTableDescriptor table : allTables) {
-            String tName = table.getNameAsString();
-            if (!tName.equals(tableNameBase) && !tName.startsWith(HBaseMiniclusterHelper.SHARED_STORAGE_PREFIX))
-                continue;
-
-            cli.execute("hbase org.apache.hadoop.hbase.mapreduce.Export " + tName + " " + exportHdfsFolder + tName);
-        }
-
-        cli.execute("hadoop fs -copyToLocal " + exportHdfsFolder + " " + exportLocalFolderParent);
-        cli.execute("tar -zcvf " + backupArchive + " --directory=" + exportLocalFolderParent + " " + currentTIME);
-        downloadToLocal();
-    }
-
-    public void downloadToLocal() throws IOException {
-        String localArchive = "../examples/test_case_data/minicluster/hbase-export.tar.gz";
-
-        if (kylinConfig.getRunAsRemoteCommand()) {
-            SSHClient ssh = new SSHClient(kylinConfig.getRemoteHadoopCliHostname(), kylinConfig.getRemoteHadoopCliPort(), kylinConfig.getRemoteHadoopCliUsername(), kylinConfig.getRemoteHadoopCliPassword());
-            try {
-                ssh.scpFileToLocal(backupArchive, localArchive);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        } else {
-            FileUtils.copyFile(new File(backupArchive), new File(localArchive));
-        }
-    }
-
-    public static void main(String[] args) {
-        ExportHBaseData export = new ExportHBaseData();
-        try {
-            export.exportTables();
-        } catch (IOException e) {
-            e.printStackTrace();
-        } finally {
-            export.tearDown();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
deleted file mode 100644
index 6a615cb..0000000
--- a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import java.io.File;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-@Ignore("this test case will break existing metadata store")
-public class ITKafkaBasedIIStreamBuilderTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(ITKafkaBasedIIStreamBuilderTest.class);
-
-    private KylinConfig kylinConfig;
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-    }
-
-    @Test
-    public void test() throws Exception {
-        final StreamingBootstrap bootstrap = StreamingBootstrap.getInstance(kylinConfig);
-        bootstrap.start("eagle", 0);
-        Thread.sleep(30 * 60 * 1000);
-        logger.info("time is up, stop streaming");
-        bootstrap.stop();
-        Thread.sleep(5 * 1000);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java b/job/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
deleted file mode 100644
index 44ba8f4..0000000
--- a/job/src/test/java/org/apache/kylin/job/dataGen/ColumnConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dataGen;
-
-import java.util.ArrayList;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ColumnConfig {
-    @JsonProperty("columnName")
-    private String columnName;
-    @JsonProperty("valueSet")
-    private ArrayList<String> valueSet;
-    @JsonProperty("exclusive")
-    private boolean exclusive;
-    @JsonProperty("asRange")
-    private boolean asRange;
-
-    public boolean isAsRange() {
-        return asRange;
-    }
-
-    public void setAsRange(boolean asRange) {
-        this.asRange = asRange;
-    }
-
-    public boolean isExclusive() {
-        return exclusive;
-    }
-
-    public void setExclusive(boolean exclusive) {
-        this.exclusive = exclusive;
-    }
-
-    public String getColumnName() {
-        return columnName;
-    }
-
-    public void setColumnName(String columnName) {
-        this.columnName = columnName;
-    }
-
-    public ArrayList<String> getValueSet() {
-        return valueSet;
-    }
-
-    public void setValueSet(ArrayList<String> valueSet) {
-        this.valueSet = valueSet;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
deleted file mode 100644
index a965753..0000000
--- a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ /dev/null
@@ -1,647 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dataGen;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- */
-public class FactTableGenerator {
-    CubeInstance cube = null;
-    CubeDesc desc = null;
-    ResourceStore store = null;
-    String factTableName = null;
-
-    GenConfig genConf = null;
-
-    Random r = null;
-
-    String cubeName;
-    long randomSeed;
-    int rowCount;
-    int unlinkableRowCount;
-    int unlinkableRowCountMax;
-    double conflictRatio;
-    double linkableRatio;
-
-    // the names of lookup table columns which is in relation with fact
-    // table(appear as fk in fact table)
-    TreeMap<String, LinkedList<String>> lookupTableKeys = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-
-    // possible values of lookupTableKeys, extracted from existing lookup
-    // tables.
-    // The key is in the format of tablename/columnname
-    TreeMap<String, ArrayList<String>> feasibleValues = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-
-    // lookup table name -> sets of all composite keys
-    TreeMap<String, HashSet<Array<String>>> lookupTableCompositeKeyValues = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-
-    private void init(String cubeName, int rowCount, double conflictRaio, double linkableRatio, long randomSeed) {
-        this.rowCount = rowCount;
-        this.conflictRatio = conflictRaio;
-        this.cubeName = cubeName;
-        this.randomSeed = randomSeed;
-        this.linkableRatio = linkableRatio;
-
-        this.unlinkableRowCountMax = (int) (this.rowCount * (1 - linkableRatio));
-        this.unlinkableRowCount = 0;
-
-        r = new Random(randomSeed);
-
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        desc = cube.getDescriptor();
-        factTableName = desc.getFactTable();
-        store = ResourceStore.getStore(config);
-    }
-
-    /*
-     * users can specify the value preference for each column
-     */
-    private void loadConfig() {
-        try {
-            InputStream configStream = null;
-            configStream = store.getResource("/data/data_gen_config.json");
-            this.genConf = GenConfig.loadConfig(configStream);
-
-            if (configStream != null)
-                configStream.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    private void loadLookupTableValues(String lookupTableName, LinkedList<String> columnNames, int distinctRowCount) throws Exception {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-
-        // only deal with composite keys
-        if (columnNames.size() > 1 && !lookupTableCompositeKeyValues.containsKey(lookupTableName)) {
-            lookupTableCompositeKeyValues.put(lookupTableName, new HashSet<Array<String>>());
-        }
-
-        InputStream tableStream = null;
-        BufferedReader tableReader = null;
-        try {
-            TreeMap<String, Integer> zeroBasedInice = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-            for (String columnName : columnNames) {
-                ColumnDesc cDesc = MetadataManager.getInstance(config).getTableDesc(lookupTableName).findColumnByName(columnName);
-                zeroBasedInice.put(columnName, cDesc.getZeroBasedIndex());
-            }
-
-            String path = "/data/" + lookupTableName + ".csv";
-            tableStream = store.getResource(path);
-            tableReader = new BufferedReader(new InputStreamReader(tableStream));
-            tableReader.mark(0);
-            int rowCount = 0;
-            int curRowNum = 0;
-            String curRow;
-
-            while (tableReader.readLine() != null)
-                rowCount++;
-
-            HashSet<Integer> rows = new HashSet<Integer>();
-            distinctRowCount = (distinctRowCount < rowCount) ? distinctRowCount : rowCount;
-            while (rows.size() < distinctRowCount) {
-                rows.add(r.nextInt(rowCount));
-            }
-
-            // reopen the stream
-            tableReader.close();
-            tableStream.close();
-            tableStream = null;
-            tableReader = null;
-
-            tableStream = store.getResource(path);
-            tableReader = new BufferedReader(new InputStreamReader(tableStream));
-
-            while ((curRow = tableReader.readLine()) != null) {
-                if (rows.contains(curRowNum)) {
-                    String[] tokens = curRow.split(",");
-
-                    String[] comboKeys = null;
-                    int index = 0;
-                    if (columnNames.size() > 1)
-                        comboKeys = new String[columnNames.size()];
-
-                    for (String columnName : columnNames) {
-                        int zeroBasedIndex = zeroBasedInice.get(columnName);
-                        if (!feasibleValues.containsKey(lookupTableName + "/" + columnName))
-                            feasibleValues.put(lookupTableName + "/" + columnName, new ArrayList<String>());
-                        feasibleValues.get(lookupTableName + "/" + columnName).add(tokens[zeroBasedIndex]);
-
-                        if (columnNames.size() > 1) {
-                            comboKeys[index] = tokens[zeroBasedIndex];
-                            index++;
-                        }
-                    }
-
-                    if (columnNames.size() > 1) {
-                        Array<String> wrap = new Array<String>(comboKeys);
-                        if (lookupTableCompositeKeyValues.get(lookupTableName).contains(wrap)) {
-                            throw new Exception("The composite key already exist in the lookup table");
-                        }
-                        lookupTableCompositeKeyValues.get(lookupTableName).add(wrap);
-                    }
-                }
-                curRowNum++;
-            }
-
-            if (tableStream != null)
-                tableStream.close();
-            if (tableReader != null)
-                tableReader.close();
-
-        } catch (IOException e) {
-            e.printStackTrace();
-            System.exit(1);
-        }
-    }
-
-    // prepare the candidate values for each joined column
-    private void prepare() throws Exception {
-        // load config
-        loadConfig();
-
-        TreeSet<String> factTableColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
-
-        for (DimensionDesc dim : desc.getDimensions()) {
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (col.getTable().equals(factTableName))
-                    factTableColumns.add(col.getName());
-            }
-
-            JoinDesc join = dim.getJoin();
-            if (join != null) {
-                String lookupTable = dim.getTable();
-                for (String column : join.getPrimaryKey()) {
-                    if (!lookupTableKeys.containsKey(lookupTable)) {
-                        lookupTableKeys.put(lookupTable, new LinkedList<String>());
-                    }
-
-                    if (!lookupTableKeys.get(lookupTable).contains(column))
-                        lookupTableKeys.get(lookupTable).add(column);
-                }
-            }
-        }
-
-        int distinctRowCount = (int) (this.rowCount / this.conflictRatio);
-        distinctRowCount = (distinctRowCount == 0) ? 1 : distinctRowCount;
-        // lookup tables
-        for (String lookupTable : lookupTableKeys.keySet()) {
-            this.loadLookupTableValues(lookupTable, lookupTableKeys.get(lookupTable), distinctRowCount);
-        }
-    }
-
-    private List<DimensionDesc> getSortedDimentsionDescs() {
-        List<DimensionDesc> dimensions = desc.getDimensions();
-        Collections.sort(dimensions, new Comparator<DimensionDesc>() {
-            @Override
-            public int compare(DimensionDesc o1, DimensionDesc o2) {
-                JoinDesc j1 = o2.getJoin();
-                JoinDesc j2 = o1.getJoin();
-                return Integer.valueOf(j1 != null ? j1.getPrimaryKey().length : 0).compareTo(j2 != null ? j2.getPrimaryKey().length : 0);
-            }
-        });
-        return dimensions;
-    }
-
-    /**
-     * Generate the fact table and return it as text
-     *
-     * @return
-     * @throws Exception
-     */
-    private String cookData() throws Exception {
-        // the columns on the fact table can be classified into three groups:
-        // 1. foreign keys
-        TreeMap<String, String> factTableCol2LookupCol = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-        // 2. metrics or directly used dimensions
-        TreeSet<String> usedCols = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
-        // 3. others, not referenced anywhere
-
-        TreeMap<String, String> lookupCol2factTableCol = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-
-        // find fact table columns in fks
-        List<DimensionDesc> dimensions = getSortedDimentsionDescs();
-        for (DimensionDesc dim : dimensions) {
-            JoinDesc jDesc = dim.getJoin();
-            if (jDesc != null) {
-                String[] fks = jDesc.getForeignKey();
-                String[] pks = jDesc.getPrimaryKey();
-                int num = fks.length;
-                for (int i = 0; i < num; ++i) {
-                    String value = dim.getTable() + "/" + pks[i];
-
-                    lookupCol2factTableCol.put(value, fks[i]);
-
-                    if (factTableCol2LookupCol.containsKey(fks[i])) {
-                        if (!factTableCol2LookupCol.get(fks[i]).equals(value)) {
-                            System.out.println("Warning: Disambiguation on the mapping of column " + fks[i] + ", " + factTableCol2LookupCol.get(fks[i]) + "(chosen) or " + value);
-                            continue;
-                        }
-                    }
-                    factTableCol2LookupCol.put(fks[i], value);
-                }
-            }
-            //else, deal with it in next roung
-        }
-
-        // find fact table columns in direct dimension
-        // DO NOT merge this with the previous loop
-        for (DimensionDesc dim : dimensions) {
-            JoinDesc jDesc = dim.getJoin();
-            if (jDesc == null) {
-                // column on fact table used directly as a dimension
-                for (String aColumn : dim.getColumn()) {
-                    if (!factTableCol2LookupCol.containsKey(aColumn))
-                        usedCols.add(aColumn);
-                }
-            }
-        }
-
-        // find fact table columns in measures
-        for (MeasureDesc mDesc : desc.getMeasures()) {
-            List<TblColRef> pcols = mDesc.getFunction().getParameter().getColRefs();
-            if (pcols != null) {
-                for (TblColRef col : pcols) {
-                    if (!factTableCol2LookupCol.containsKey(col.getName()))
-                        usedCols.add(col.getName());
-                }
-            }
-        }
-
-        return createTable(this.rowCount, factTableCol2LookupCol, lookupCol2factTableCol, usedCols);
-    }
-
-    private String normToTwoDigits(int v) {
-        if (v < 10)
-            return "0" + v;
-        else
-            return Integer.toString(v);
-    }
-
-    private String randomPick(ArrayList<String> candidates) {
-        int index = r.nextInt(candidates.size());
-        return candidates.get(index);
-    }
-
-    private String createRandomCell(ColumnDesc cDesc, ArrayList<String> range) throws Exception {
-        DataType type = cDesc.getType();
-        if (type.isStringFamily()) {
-            throw new Exception("Can't handle range values for string");
-
-        } else if (type.isIntegerFamily()) {
-            int low = Integer.parseInt(range.get(0));
-            int high = Integer.parseInt(range.get(1));
-            return Integer.toString(r.nextInt(high - low) + low);
-
-        } else if (type.isDouble()) {
-            double low = Double.parseDouble(range.get(0));
-            double high = Double.parseDouble(range.get(1));
-            return String.format("%.4f", r.nextDouble() * (high - low) + low);
-
-        } else if (type.isFloat()) {
-            float low = Float.parseFloat(range.get(0));
-            float high = Float.parseFloat(range.get(1));
-            return String.format("%.4f", r.nextFloat() * (high - low) + low);
-
-        } else if (type.isDecimal()) {
-            double low = Double.parseDouble(range.get(0));
-            double high = Double.parseDouble(range.get(1));
-            return String.format("%.4f", r.nextDouble() * (high - low) + low);
-
-        } else if (type.isDateTimeFamily()) {
-            if (!type.isDate()) {
-                throw new RuntimeException("Does not support " + type);
-            }
-
-            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-            Date start = format.parse(range.get(0));
-            Date end = format.parse(range.get(1));
-            long diff = end.getTime() - start.getTime();
-            Date temp = new Date(start.getTime() + (long) (diff * r.nextDouble()));
-            Calendar cal = Calendar.getInstance();
-            cal.setTime(temp);
-            // first day
-            cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
-
-            return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
-        } else {
-            System.out.println("The data type " + type + "is not recognized");
-            System.exit(1);
-        }
-        return null;
-    }
-
-    private String createRandomCell(ColumnDesc cDesc) {
-        String type = cDesc.getTypeName();
-        String s = type.toLowerCase();
-        if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
-            StringBuilder sb = new StringBuilder();
-            for (int i = 0; i < 2; i++) {
-                sb.append((char) ('a' + r.nextInt(10)));// there are 10*10
-                // possible strings
-            }
-            return sb.toString();
-        } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
-            return Integer.toString(r.nextInt(128));
-        } else if (s.equals("double")) {
-            return String.format("%.4f", r.nextDouble() * 100);
-        } else if (s.equals("float")) {
-            return String.format("%.4f", r.nextFloat() * 100);
-        } else if (s.equals("decimal")) {
-            return String.format("%.4f", r.nextDouble() * 100);
-        } else if (s.equals("date")) {
-            long date20131231 = 61349312153265L;
-            long date20010101 = 60939158400000L;
-            long diff = date20131231 - date20010101;
-            Date temp = new Date(date20010101 + (long) (diff * r.nextDouble()));
-            Calendar cal = Calendar.getInstance();
-            cal.setTime(temp);
-            // first day
-            cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek());
-
-            return cal.get(Calendar.YEAR) + "-" + normToTwoDigits(cal.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(cal.get(Calendar.DAY_OF_MONTH));
-        } else {
-            System.out.println("The data type " + type + "is not recognized");
-            System.exit(1);
-        }
-        return null;
-    }
-
-    private String createDefaultsCell(String type) {
-        String s = type.toLowerCase();
-        if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
-            return "abcde";
-        } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
-            return "0";
-        } else if (s.equals("double")) {
-            return "0";
-        } else if (s.equals("float")) {
-            return "0";
-        } else if (s.equals("decimal")) {
-            return "0";
-        } else if (s.equals("date")) {
-            return "1970-01-01";
-        } else {
-            System.out.println("The data type " + type + "is not recognized");
-            System.exit(1);
-        }
-        return null;
-    }
-
-    private void printColumnMappings(TreeMap<String, String> factTableCol2LookupCol, TreeSet<String> usedCols, TreeSet<String> defaultColumns) {
-
-        System.out.println("=======================================================================");
-        System.out.format("%-30s %s", "FACT_TABLE_COLUMN", "MAPPING");
-        System.out.println();
-        System.out.println();
-        for (Map.Entry<String, String> entry : factTableCol2LookupCol.entrySet()) {
-            System.out.format("%-30s %s", entry.getKey(), entry.getValue());
-            System.out.println();
-        }
-        for (String key : usedCols) {
-            System.out.format("%-30s %s", key, "Random Values");
-            System.out.println();
-        }
-        for (String key : defaultColumns) {
-            System.out.format("%-30s %s", key, "Default Values");
-            System.out.println();
-        }
-        System.out.println("=======================================================================");
-
-        System.out.println("Parameters:");
-        System.out.println();
-        System.out.println("CubeName:        " + cubeName);
-        System.out.println("RowCount:        " + rowCount);
-        System.out.println("ConflictRatio:   " + conflictRatio);
-        System.out.println("LinkableRatio:   " + linkableRatio);
-        System.out.println("Seed:            " + randomSeed);
-        System.out.println();
-        System.out.println("The number of actual unlinkable fact rows is: " + this.unlinkableRowCount);
-        System.out.println("You can vary the above parameters to generate different datasets.");
-        System.out.println();
-    }
-
-    // Any row in the column must finally appear in the flatten big table.
-    // for single-column joins the generated row is guaranteed to have a match
-    // in lookup table
-    // for composite keys we'll need an extra check
-    private boolean matchAllCompositeKeys(TreeMap<String, String> lookupCol2FactTableCol, LinkedList<String> columnValues) {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-
-        for (String lookupTable : lookupTableKeys.keySet()) {
-            if (lookupTableKeys.get(lookupTable).size() == 1)
-                continue;
-
-            String[] comboKey = new String[lookupTableKeys.get(lookupTable).size()];
-            int index = 0;
-            for (String column : lookupTableKeys.get(lookupTable)) {
-                String key = lookupTable + "/" + column;
-                String factTableCol = lookupCol2FactTableCol.get(key);
-                int cardinal = MetadataManager.getInstance(config).getTableDesc(factTableName).findColumnByName(factTableCol).getZeroBasedIndex();
-                comboKey[index] = columnValues.get(cardinal);
-
-                index++;
-            }
-            Array<String> wrap = new Array<String>(comboKey);
-            if (!lookupTableCompositeKeyValues.get(lookupTable).contains(wrap)) {
-                // System.out.println("Try " + wrap + " Failed, continue...");
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private String createCell(ColumnDesc cDesc) throws Exception {
-        ColumnConfig cConfig = null;
-
-        if ((cConfig = genConf.getColumnConfigByName(cDesc.getName())) == null) {
-            // if the column is not configured, use random values
-            return (createRandomCell(cDesc));
-
-        } else {
-            // the column has a configuration
-            if (!cConfig.isAsRange() && !cConfig.isExclusive() && r.nextBoolean()) {
-                // if the column still allows random values
-                return (createRandomCell(cDesc));
-
-            } else {
-                // use specified values
-                ArrayList<String> valueSet = cConfig.getValueSet();
-                if (valueSet == null || valueSet.size() == 0)
-                    throw new Exception("Did you forget to specify value set for " + cDesc.getName());
-
-                if (!cConfig.isAsRange()) {
-                    return (randomPick(valueSet));
-                } else {
-                    if (valueSet.size() != 2)
-                        throw new Exception("Only two values can be set for range values, the column: " + cDesc.getName());
-
-                    return (createRandomCell(cDesc, valueSet));
-                }
-            }
-
-        }
-    }
-
-    private LinkedList<String> createRow(TreeMap<String, String> factTableCol2LookupCol, TreeSet<String> usedCols, TreeSet<String> defaultColumns) throws Exception {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        LinkedList<String> columnValues = new LinkedList<String>();
-
-        for (ColumnDesc cDesc : MetadataManager.getInstance(config).getTableDesc(factTableName).getColumns()) {
-
-            String colName = cDesc.getName();
-
-            if (factTableCol2LookupCol.containsKey(colName)) {
-
-                // if the current column is a fk column in fact table
-                ArrayList<String> candidates = this.feasibleValues.get(factTableCol2LookupCol.get(colName));
-
-                columnValues.add(candidates.get(r.nextInt(candidates.size())));
-            } else if (usedCols.contains(colName)) {
-
-                // if the current column is a metric column in fact table
-                columnValues.add(createCell(cDesc));
-            } else {
-
-                // otherwise this column is not useful in OLAP
-                columnValues.add(createDefaultsCell(cDesc.getTypeName()));
-                defaultColumns.add(colName);
-            }
-        }
-
-        return columnValues;
-    }
-
-    /**
-     * return the text of table contents(one line one row)
-     *
-     * @param rowCount
-     * @param factTableCol2LookupCol
-     * @param lookupCol2FactTableCol
-     * @param usedCols
-     * @return
-     * @throws Exception
-     */
-    private String createTable(int rowCount, TreeMap<String, String> factTableCol2LookupCol, TreeMap<String, String> lookupCol2FactTableCol, TreeSet<String> usedCols) throws Exception {
-        try {
-            TreeSet<String> defaultColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
-
-            StringBuffer sb = new StringBuffer();
-            for (int i = 0; i < rowCount;) {
-
-                LinkedList<String> columnValues = createRow(factTableCol2LookupCol, usedCols, defaultColumns);
-
-                if (!matchAllCompositeKeys(lookupCol2FactTableCol, columnValues)) {
-                    if (unlinkableRowCount < unlinkableRowCountMax) {
-                        unlinkableRowCount++;
-                    } else {
-                        continue;
-                    }
-                }
-
-                for (String c : columnValues)
-                    sb.append(c + ",");
-                sb.deleteCharAt(sb.length() - 1);
-                sb.append(System.getProperty("line.separator"));
-
-                i++;
-
-                // System.out.println("Just generated the " + i + "th record");
-            }
-
-            printColumnMappings(factTableCol2LookupCol, usedCols, defaultColumns);
-
-            return sb.toString();
-
-        } catch (IOException e) {
-            e.printStackTrace();
-            System.exit(1);
-        }
-
-        return null;
-    }
-
-    /**
-     * Randomly create a fact table and return the table content
-     *
-     * @param cubeName      name of the cube
-     * @param rowCount      expected row count generated
-     * @param linkableRatio the percentage of fact table rows that can be linked with all
-     *                      lookup table by INNER join
-     * @param randomSeed    random seed
-     */
-    public static String generate(String cubeName, String rowCount, String linkableRatio, String randomSeed) throws Exception {
-
-        if (rowCount == null)
-            rowCount = "10000";
-        if (linkableRatio == null)
-            linkableRatio = "0.6";
-
-        //if (randomSeed == null)
-        // don't give it value
-
-        // String conflictRatio = "5";//this parameter do not allow configuring
-        // any more
-
-        FactTableGenerator generator = new FactTableGenerator();
-        long seed;
-        if (randomSeed != null) {
-            seed = Long.parseLong(randomSeed);
-        } else {
-            Random r = new Random();
-            seed = r.nextLong();
-        }
-
-        generator.init(cubeName, Integer.parseInt(rowCount), 5, Double.parseDouble(linkableRatio), seed);
-        generator.prepare();
-        return generator.cookData();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java b/job/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
deleted file mode 100644
index c58cfb6..0000000
--- a/job/src/test/java/org/apache/kylin/job/dataGen/GenConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dataGen;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.kylin.common.util.JsonUtil;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class GenConfig {
-
-    @JsonProperty("columnConfigs")
-    private ArrayList<ColumnConfig> columnConfigs;
-
-    private HashMap<String, ColumnConfig> cache = new HashMap<String, ColumnConfig>();
-
-    public ArrayList<ColumnConfig> getColumnConfigs() {
-        return columnConfigs;
-    }
-
-    public void setColumnConfigs(ArrayList<ColumnConfig> columnConfigs) {
-        this.columnConfigs = columnConfigs;
-    }
-
-    public ColumnConfig getColumnConfigByName(String columnName) {
-        columnName = columnName.toLowerCase();
-
-        if (cache.containsKey(columnName))
-            return cache.get(columnName);
-
-        for (ColumnConfig cConfig : columnConfigs) {
-            if (cConfig.getColumnName().toLowerCase().equals(columnName)) {
-                cache.put(columnName, cConfig);
-                return cConfig;
-            }
-        }
-        cache.put(columnName, null);
-        return null;
-    }
-
-    public static GenConfig loadConfig(InputStream stream) {
-        try {
-            GenConfig config = JsonUtil.readValue(stream, GenConfig.class);
-            return config;
-        } catch (JsonMappingException e) {
-            e.printStackTrace();
-        } catch (JsonParseException e) {
-            e.printStackTrace();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
deleted file mode 100644
index ebc4114..0000000
--- a/job/src/test/java/org/apache/kylin/job/dataGen/StreamingDataGenerator.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.kylin.job.dataGen;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-/**
- * data gen for II streaming, may be merged with StreamingTableDataGenerator
- */
-public class StreamingDataGenerator {
-    private static final Logger logger = LoggerFactory.getLogger(StreamingDataGenerator.class);
-    private static Random random = new Random();
-    private static String[] decimalFormat = new String[] { "%.4f", "%.5f", "%.6f" };
-
-    public static Iterator<String> generate(final long start, final long end, final int count) {
-        final KylinConfig config = KylinConfig.getInstanceFromEnv();
-        final IIInstance ii = IIManager.getInstance(config).getII("test_streaming_table_ii");
-        final IIDesc iiDesc = ii.getDescriptor();
-        final List<TblColRef> columns = iiDesc.listAllColumns();
-
-        return new Iterator<String>() {
-            private Map<String, String> values = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
-            private int index = 0;
-
-            @Override
-            public boolean hasNext() {
-                return this.index < count;
-            }
-
-            @Override
-            public String next() {
-                values.clear();
-                long ts = this.createTs(start, end);
-                values.put("minute_start", Long.toString(TimeUtil.getMinuteStart(ts)));
-                values.put("hour_start", Long.toString(TimeUtil.getHourStart(ts)));
-                values.put("day_start", Long.toString(TimeUtil.getDayStart(ts)));
-                values.put("itm", Integer.toString(random.nextInt(20)));
-                values.put("site", Integer.toString(random.nextInt(5)));
-
-                values.put("gmv", String.format(decimalFormat[random.nextInt(3)], random.nextFloat() * 100));
-                values.put("item_count", Integer.toString(random.nextInt(5)));
-
-                if (values.size() != columns.size()) {
-                    throw new RuntimeException("the structure of streaming table has changed, need to modify generator too");
-                }
-
-                ByteArrayOutputStream os = new ByteArrayOutputStream();
-                try {
-                    JsonUtil.writeValue(os, values);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    throw new RuntimeException(e);
-                }
-                index++;
-                return new String(os.toByteArray());
-            }
-
-            @Override
-            public void remove() {
-            }
-
-            private long createTs(final long start, final long end) {
-                return start + (long) (random.nextDouble() * (end - start));
-            }
-        };
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
deleted file mode 100644
index dcd460b..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.util.FIFOIterable;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.invertedindex.model.KeyValueCodec;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.ClearTextDictionary;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregators;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StringStreamParser;
-import org.apache.kylin.streaming.invertedindex.SliceBuilder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- */
-public class IITest extends LocalFileMetadataTestCase {
-
-    String iiName = "test_kylin_ii_inner_join";
-    IIInstance ii;
-    IIDesc iiDesc;
-    String cubeName = "test_kylin_cube_with_slr_empty";
-
-    List<IIRow> iiRows;
-
-    final String[] inputData = new String[] { //
-    "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
-            "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
-            "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
-
-    @Before
-    public void setUp() throws Exception {
-        this.createTestMetadata();
-        this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
-        this.iiDesc = ii.getDescriptor();
-
-        List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
-            @Nullable
-            @Override
-            public StreamMessage apply(String input) {
-                return new StreamMessage(System.currentTimeMillis(), input.getBytes());
-            }
-        });
-
-        List<List<String>> parsedStreamMessages = Lists.newArrayList();
-        StreamParser parser = StringStreamParser.instance;
-
-        MicroStreamBatch batch = new MicroStreamBatch(0);
-        for (StreamMessage message : streamMessages) {
-            ParsedStreamMessage parsedStreamMessage = parser.parse(message);
-            if ((parsedStreamMessage.isAccepted())) {
-                batch.add(parsedStreamMessage);
-            }
-        }
-
-        iiRows = Lists.newArrayList();
-        final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
-        IIKeyValueCodec codec = new IIKeyValueCodec(slice.getInfo());
-        for (IIRow iiRow : codec.encodeKeyValue(slice)) {
-            iiRows.add(iiRow);
-        }
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-
-    /**
-     * simulate stream building into slices, and encode the slice into IIRows.
-     * Then reconstruct the IIRows to slice.
-     */
-    @Test
-    public void basicTest() {
-        Queue<IIRow> buffer = Lists.newLinkedList();
-        FIFOIterable bufferIterable = new FIFOIterable(buffer);
-        TableRecordInfo info = new TableRecordInfo(iiDesc);
-        TableRecordInfoDigest digest = info.getDigest();
-        KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
-        Iterator<Slice> slices = codec.decodeKeyValue(bufferIterable).iterator();
-
-        Assert.assertTrue(!slices.hasNext());
-        Assert.assertEquals(iiRows.size(), digest.getColumnCount());
-
-        for (int i = 0; i < digest.getColumnCount(); ++i) {
-            buffer.add(iiRows.get(i));
-
-            if (i != digest.getColumnCount() - 1) {
-                Assert.assertTrue(!slices.hasNext());
-            } else {
-                Assert.assertTrue(slices.hasNext());
-            }
-        }
-
-        Slice newSlice = slices.next();
-        Assert.assertEquals(newSlice.getLocalDictionaries()[0].getSize(), 2);
-    }
-
-    @Test
-    public void IIEndpointTest() {
-        TableRecordInfo info = new TableRecordInfo(ii.getDescriptor());
-        CoprocessorRowType type = CoprocessorRowType.fromTableRecordInfo(info, ii.getFirstSegment().getColumns());
-        CoprocessorProjector projector = CoprocessorProjector.makeForEndpoint(info, Collections.singletonList(ii.getDescriptor().findColumnRef("default.test_kylin_fact", "lstg_format_name")));
-
-        FunctionDesc f1 = new FunctionDesc();
-        f1.setExpression("SUM");
-        ParameterDesc p1 = new ParameterDesc();
-        p1.setType("column");
-        p1.setValue("PRICE");
-        f1.setParameter(p1);
-        f1.setReturnType("decimal(19,4)");
-
-        TblColRef column = ii.getDescriptor().findColumnRef("default.test_kylin_fact", "cal_dt");
-        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GTE);
-        ColumnTupleFilter columnFilter = new ColumnTupleFilter(column);
-        compareFilter.addChild(columnFilter);
-        ConstantTupleFilter constantFilter = null;
-        constantFilter = new ConstantTupleFilter(("2012-08-16"));
-        compareFilter.addChild(constantFilter);
-
-        EndpointAggregators aggregators = EndpointAggregators.fromFunctions(info, Collections.singletonList(f1));
-        CoprocessorFilter filter = CoprocessorFilter.fromFilter(new ClearTextDictionary(info), compareFilter, FilterDecorator.FilterConstantsTreatment.AS_IT_IS);
-
-        final Iterator<IIRow> iiRowIterator = iiRows.iterator();
-
-        IIEndpoint endpoint = new IIEndpoint();
-        IIProtos.IIResponseInternal response = endpoint.getResponse(new RegionScanner() {
-            @Override
-            public HRegionInfo getRegionInfo() {
-                throw new NotImplementedException();
-            }
-
-            @Override
-            public boolean isFilterDone() throws IOException {
-                throw new NotImplementedException();
-            }
-
-            @Override
-            public boolean reseek(byte[] row) throws IOException {
-                throw new NotImplementedException();
-            }
-
-            @Override
-            public long getMaxResultSize() {
-                throw new NotImplementedException();
-
-            }
-
-            @Override
-            public long getMvccReadPoint() {
-                throw new NotImplementedException();
-            }
-
-            @Override
-            public boolean nextRaw(List<Cell> result) throws IOException {
-                if (iiRowIterator.hasNext()) {
-                    IIRow iiRow = iiRowIterator.next();
-                    result.addAll(iiRow.makeCells());
-                    return true;
-                } else {
-                    return false;
-                }
-            }
-
-            @Override
-            public boolean nextRaw(List<Cell> result, int limit) throws IOException {
-                throw new NotImplementedException();
-            }
-
-            @Override
-            public boolean next(List<Cell> results) throws IOException {
-                throw new NotImplementedException();
-            }
-
-            @Override
-            public boolean next(List<Cell> result, int limit) throws IOException {
-                throw new NotImplementedException();
-            }
-
-            @Override
-            public void close() throws IOException {
-                throw new NotImplementedException();
-            }
-        }, type, projector, aggregators, filter);
-
-        Assert.assertEquals(2, response.getRowsList().size());
-        System.out.println(response.getRowsList().size());
-        Set<String> answers = Sets.newHashSet("120.4747", "26.8551");
-        for (IIProtos.IIResponseInternal.IIRow responseRow : response.getRowsList()) {
-            byte[] measuresBytes = responseRow.getMeasures().toByteArray();
-            List<Object> metrics = aggregators.deserializeMetricValues(measuresBytes, 0);
-            Assert.assertTrue(answers.contains(metrics.get(0)));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
deleted file mode 100644
index fd2eceb..0000000
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.lock.MockJobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-
-/**
- */
-public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase {
-
-    private DefaultScheduler scheduler;
-
-    protected ExecutableManager jobService;
-
-    static void setFinalStatic(Field field, Object newValue) throws Exception {
-        field.setAccessible(true);
-
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-
-        field.set(null, newValue);
-    }
-
-    protected void waitForJobFinish(String jobId) {
-        while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
-            final ExecutableState status = job.getStatus();
-            if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) {
-                break;
-            } else {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    protected void waitForJobStatus(String jobId, ExecutableState state, long interval) {
-        while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == state) {
-                break;
-            } else {
-                try {
-                    Thread.sleep(interval);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    @BeforeClass
-    public static void beforeClass() {
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
-        DeployUtil.overrideJobJarLocations();
-    }
-
-    @Before
-    public void setup() throws Exception {
-        createTestMetadata();
-        setFinalStatic(ExecutableConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10);
-        jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-        scheduler = DefaultScheduler.getInstance();
-        scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock());
-        if (!scheduler.hasStarted()) {
-            throw new RuntimeException("scheduler has not been started");
-        }
-
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
deleted file mode 100644
index 7c33d02..0000000
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kylin.job.BaseTestExecutable;
-import org.apache.kylin.job.ErrorTestExecutable;
-import org.apache.kylin.job.FailedTestExecutable;
-import org.apache.kylin.job.SelfStopExecutable;
-import org.apache.kylin.job.SucceedTestExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.junit.Test;
-
-/**
- */
-public class DefaultSchedulerTest extends BaseSchedulerTest {
-
-    @Test
-    public void testSingleTaskJob() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        job.addTask(task1);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-    }
-
-    @Test
-    public void testSucceed() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        BaseTestExecutable task2 = new SucceedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
-    }
-
-    @Test
-    public void testSucceedAndFailed() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        BaseTestExecutable task2 = new FailedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
-    }
-
-    @Test
-    public void testSucceedAndError() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new ErrorTestExecutable();
-        BaseTestExecutable task2 = new SucceedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
-    }
-
-    @Test
-    public void testDiscard() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SelfStopExecutable();
-        job.addTask(task1);
-        jobService.addJob(job);
-        waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
-        jobService.discardJob(job.getId());
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
-        Thread.sleep(5000);
-        System.out.println(job);
-    }
-
-    @Test
-    public void testSchedulerPool() throws InterruptedException {
-        ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
-        final CountDownLatch countDownLatch = new CountDownLatch(3);
-        ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                countDownLatch.countDown();
-            }
-        }, 5, 5, TimeUnit.SECONDS);
-        assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
-        assertTrue("future should still running", future.cancel(true));
-
-        final CountDownLatch countDownLatch2 = new CountDownLatch(3);
-        ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                countDownLatch2.countDown();
-                throw new RuntimeException();
-            }
-        }, 5, 5, TimeUnit.SECONDS);
-        assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
-        assertFalse("future2 should has been stopped", future2.cancel(true));
-
-        final CountDownLatch countDownLatch3 = new CountDownLatch(3);
-        ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    countDownLatch3.countDown();
-                    throw new RuntimeException();
-                } catch (Exception e) {
-                }
-            }
-        }, 5, 5, TimeUnit.SECONDS);
-        assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
-        assertTrue("future3 should still running", future3.cancel(true));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
deleted file mode 100644
index be4fa26..0000000
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-@Ignore
-public class CubeStreamConsumerTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumerTest.class);
-
-    private KylinConfig kylinConfig;
-
-    private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
-        // remove all existing segments
-        CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
-
-    }
-
-    @Test
-    public void test() throws Exception {
-        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
-        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
-        queues.add(queue);
-        StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
-        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
-        loadDataFromLocalFile(queue, 100000);
-        future.get();
-    }
-
-    private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
-        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
-        String line;
-        int count = 0;
-        while ((line = br.readLine()) != null && count++ < maxCount) {
-            final List<String> strings = Arrays.asList(line.split("\t"));
-            queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
-        }
-        queue.put(StreamMessage.EOF);
-    }
-}