You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/19 02:27:54 UTC

[05/12] incubator-kylin git commit: KYLIN-1010 Decompose project job

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
deleted file mode 100644
index c9ad448..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexJob extends AbstractHadoopJob {
-    protected static final Logger logger = LoggerFactory.getLogger(InvertedIndexJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_II_NAME);
-            options.addOption(OPTION_TABLE_NAME);
-            options.addOption(OPTION_OUTPUT_PATH);
-            parseOptions(options, args);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            String iiname = getOptionValue(OPTION_II_NAME);
-            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
-            // ----------------------------------------------------------------------------
-
-            System.out.println("Starting: " + job.getJobName());
-
-            IIInstance ii = getII(iiname);
-            short sharding = ii.getDescriptor().getSharding();
-
-            setJobClasspath(job);
-
-            setupMapper(intermediateTable);
-            setupReducer(output, sharding);
-            attachMetadata(ii);
-
-            return waitForCompletion(job);
-
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        } finally {
-            if (job != null)
-                cleanupTempConfFile(job.getConfiguration());
-        }
-
-    }
-
-    private IIInstance getII(String iiName) {
-        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-        IIInstance ii = mgr.getII(iiName);
-        if (ii == null)
-            throw new IllegalArgumentException("No Inverted Index found by name " + iiName);
-        return ii;
-    }
-
-    private void attachMetadata(IIInstance ii) throws IOException {
-
-        Configuration conf = job.getConfiguration();
-        attachKylinPropsAndMetadata(ii, conf);
-
-        IISegment seg = ii.getFirstSegment();
-        conf.set(BatchConstants.CFG_II_NAME, ii.getName());
-        conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName());
-    }
-
-    protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
-        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-        // write II / model_desc / II_desc / dict / table
-        ArrayList<String> dumpList = new ArrayList<String>();
-        dumpList.add(ii.getResourcePath());
-        dumpList.add(ii.getDescriptor().getModel().getResourcePath());
-        dumpList.add(ii.getDescriptor().getResourcePath());
-
-        for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
-            TableDesc table = metaMgr.getTableDesc(tableName);
-            dumpList.add(table.getResourcePath());
-        }
-        for (IISegment segment : ii.getSegments()) {
-            dumpList.addAll(segment.getDictionaryPaths());
-        }
-
-        attachKylinPropsAndMetadata(dumpList, conf);
-    }
-
-    private void setupMapper(String intermediateTable) throws IOException {
-
-        String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
-        HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-
-        job.setInputFormatClass(HCatInputFormat.class);
-
-        job.setMapperClass(InvertedIndexMapper.class);
-        job.setMapOutputKeyClass(LongWritable.class);
-        job.setMapOutputValueClass(ImmutableBytesWritable.class);
-        job.setPartitionerClass(InvertedIndexPartitioner.class);
-    }
-
-    private void setupReducer(Path output, short sharding) throws IOException {
-        job.setReducerClass(InvertedIndexReducer.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(ImmutableBytesWritable.class);
-
-        job.setNumReduceTasks(sharding);
-
-        FileOutputFormat.setOutputPath(job, output);
-
-        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
-        deletePath(job.getConfiguration(), output);
-    }
-
-    public static void main(String[] args) throws Exception {
-        InvertedIndexJob job = new InvertedIndexJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
deleted file mode 100644
index bc43b65..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, LongWritable, ImmutableBytesWritable> {
-
-    private TableRecordInfo info;
-    private TableRecord rec;
-
-    private LongWritable outputKey;
-    private ImmutableBytesWritable outputValue;
-    private HCatSchema schema = null;
-    private List<HCatFieldSchema> fields;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-        IIManager mgr = IIManager.getInstance(config);
-        IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
-        IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
-        this.info = new TableRecordInfo(seg);
-        this.rec = this.info.createTableRecord();
-
-        outputKey = new LongWritable();
-        outputValue = new ImmutableBytesWritable(rec.getBytes());
-
-        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
-
-        fields = schema.getFields();
-    }
-
-    @Override
-    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
-
-        rec.reset();
-        for (int i = 0; i < fields.size(); i++) {
-            Object fieldValue = record.get(i);
-            rec.setValueString(i, fieldValue == null ? null : fieldValue.toString());
-        }
-
-        outputKey.set(rec.getTimestamp());
-        // outputValue's backing bytes array is the same as rec
-
-        context.write(outputKey, outputValue);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
deleted file mode 100644
index 396c221..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexPartitioner extends Partitioner<LongWritable, ImmutableBytesWritable> implements Configurable {
-
-    private Configuration conf;
-    private TableRecordInfo info;
-    private TableRecord rec;
-
-    @Override
-    public int getPartition(LongWritable key, ImmutableBytesWritable value, int numPartitions) {
-        rec.setBytes(value.get(), value.getOffset(), value.getLength());
-        return rec.getShard();
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-        try {
-            KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-            IIManager mgr = IIManager.getInstance(config);
-            IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
-            IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
-            this.info = new TableRecordInfo(seg);
-            this.rec = this.info.createTableRecord();
-        } catch (IOException e) {
-            throw new RuntimeException("", e);
-        }
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
deleted file mode 100644
index 5a69eec..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.IncrementalSliceMaker;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
-
-    private TableRecordInfo info;
-    private TableRecord rec;
-    private IncrementalSliceMaker builder;
-    private IIKeyValueCodec kv;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-        IIManager mgr = IIManager.getInstance(config);
-        IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
-        IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
-        info = new TableRecordInfo(seg);
-        rec = info.createTableRecord();
-        builder = null;
-        kv = new IIKeyValueCodec(info.getDigest());
-    }
-
-    @Override
-    public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
-            throws IOException, InterruptedException {
-        for (ImmutableBytesWritable v : values) {
-            rec.setBytes(v.get(), v.getOffset(), v.getLength());
-
-            if (builder == null) {
-                builder = new IncrementalSliceMaker(info, rec.getShard());
-            }
-
-            //TODO: to delete this log
-            System.out.println(rec.getShard() + " - " + rec);
-
-            Slice slice = builder.append(rec);
-            if (slice != null) {
-                output(slice, context);
-            }
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        Slice slice = builder.close();
-        if (slice != null) {
-            output(slice, context);
-        }
-    }
-
-    private void output(Slice slice, Context context) throws IOException, InterruptedException {
-        for (IIRow pair : kv.encodeKeyValue(slice)) {
-            context.write(pair.getKey(), pair.getValue());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
deleted file mode 100644
index 0af846b..0000000
--- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.invertedindex;
-
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-/**
- */
-public class IIJob extends DefaultChainedExecutable {
-
-    public IIJob() {
-        super();
-    }
-
-    private static final String II_INSTANCE_NAME = "iiName";
-    private static final String SEGMENT_ID = "segmentId";
-
-    void setIIName(String name) {
-        setParam(II_INSTANCE_NAME, name);
-    }
-
-    public String getIIName() {
-        return getParam(II_INSTANCE_NAME);
-    }
-
-    void setSegmentId(String segmentId) {
-        setParam(SEGMENT_ID, segmentId);
-    }
-
-    public String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
deleted file mode 100644
index 4bd06c5..0000000
--- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.invertedindex;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-
-import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.hadoop.dict.CreateInvertedIndexDictionaryJob;
-import org.apache.kylin.job.hadoop.invertedindex.IIBulkLoadJob;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHFileJob;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
-import org.apache.kylin.job.hadoop.invertedindex.IIDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.invertedindex.InvertedIndexJob;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-import org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public final class IIJobBuilder {
-
-    final JobEngineConfig engineConfig;
-
-    public IIJobBuilder(JobEngineConfig engineConfig) {
-        this.engineConfig = engineConfig;
-    }
-
-    public IIJob buildJob(IISegment seg, String submitter) {
-        checkPreconditions(seg);
-
-        IIJob result = initialJob(seg, "BUILD", submitter);
-        final String jobId = result.getId();
-        final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
-        final String intermediateTableIdentity = getIntermediateTableIdentity(intermediateTableDesc);
-        final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
-        final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
-        final String iiPath = iiRootPath + "*";
-
-        final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId);
-        result.addTask(intermediateHiveTableStep);
-
-        result.addTask(createFactDistinctColumnsStep(seg, intermediateTableIdentity, jobId, factDistinctColumnsPath));
-
-        result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
-
-        result.addTask(createInvertedIndexStep(seg, intermediateTableIdentity, iiRootPath));
-
-        // create htable step
-        result.addTask(createCreateHTableStep(seg));
-
-        // generate hfiles step
-        result.addTask(createConvertToHfileStep(seg, iiPath, jobId));
-
-        // bulk load step
-        result.addTask(createBulkLoadStep(seg, jobId));
-
-        return result;
-    }
-
-    private AbstractExecutable createFlatHiveTableStep(IIJoinedFlatTableDesc intermediateTableDesc, String jobId) {
-        return BatchCubingInputSide.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId);
-    }
-
-    private IIJob initialJob(IISegment seg, String type, String submitter) {
-        IIJob result = new IIJob();
-        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
-        format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
-        result.setIIName(seg.getIIInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
-        result.setSubmitter(submitter);
-        return result;
-    }
-
-    private void checkPreconditions(IISegment seg) {
-        Preconditions.checkNotNull(seg, "segment cannot be null");
-        Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
-    }
-
-    private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) {
-        try {
-            String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM);
-            if (jobConf != null && jobConf.length() > 0) {
-                builder.append(" -conf ").append(jobConf);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) {
-        return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns";
-    }
-
-    private String getHFilePath(IISegment seg, String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/";
-    }
-
-    private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) {
-        MapReduceExecutable result = new MapReduceExecutable();
-        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        result.setMapReduceJobClass(IIDistinctColumnsJob.class);
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, engineConfig);
-        appendExecCmdParameters(cmd, "tablename", factTableName);
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "output", output);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step");
-
-        result.setMapReduceParams(cmd.toString());
-        return result;
-    }
-
-    private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) {
-        // base cuboid job
-        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-
-        buildDictionaryStep.setJobParams(cmd.toString());
-        buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
-        return buildDictionaryStep;
-    }
-
-    private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
-        // base cuboid job
-        MapReduceExecutable buildIIStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, engineConfig);
-
-        buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
-
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
-        appendExecCmdParameters(cmd, "output", iiOutputTempPath);
-        appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
-
-        buildIIStep.setMapReduceParams(cmd.toString());
-        buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
-        return buildIIStep;
-    }
-
-    private HadoopShellExecutable createCreateHTableStep(IISegment seg) {
-        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
-        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
-        createHtableStep.setJobParams(cmd.toString());
-        createHtableStep.setJobClass(IICreateHTableJob.class);
-
-        return createHtableStep;
-    }
-
-    private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) {
-        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
-        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, engineConfig);
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step");
-
-        createHFilesStep.setMapReduceParams(cmd.toString());
-        createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
-
-        return createHFilesStep;
-    }
-
-    private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) {
-        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
-        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-
-        bulkLoadStep.setJobParams(cmd.toString());
-        bulkLoadStep.setJobClass(IIBulkLoadJob.class);
-
-        return bulkLoadStep;
-
-    }
-
-    private StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
-        return buf.append(" -").append(paraName).append(" ").append(paraValue);
-    }
-
-    private String getJobWorkingDir(String uuid) {
-        return engineConfig.getHdfsWorkingDirectory() + "kylin-" + uuid;
-    }
-
-    private String getIntermediateTableIdentity(IIJoinedFlatTableDesc intermediateTableDesc) {
-        return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/main/java/org/apache/kylin/job/tools/IICLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/IICLI.java b/job/src/main/java/org/apache/kylin/job/tools/IICLI.java
deleted file mode 100644
index 8c39aa1..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/IICLI.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIRow;
-
-/**
- * @author yangli9
- */
-public class IICLI {
-
-    public static void main(String[] args) throws IOException {
-        Configuration hconf = HadoopUtil.getCurrentConfiguration();
-        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-        String iiName = args[0];
-        IIInstance ii = mgr.getII(iiName);
-
-        String path = args[1];
-        System.out.println("Reading from " + path + " ...");
-
-        TableRecordInfo info = new TableRecordInfo(ii.getFirstSegment());
-        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
-        int count = 0;
-        for (Slice slice : codec.decodeKeyValue(readSequenceKVs(hconf, path))) {
-            for (RawTableRecord rec : slice) {
-                System.out.printf(new TableRecord(rec, info).toString());
-                count++;
-            }
-        }
-        System.out.println("Total " + count + " records");
-    }
-
-    public static Iterable<IIRow> readSequenceKVs(Configuration hconf, String path) throws IOException {
-        final Reader reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
-        return new Iterable<IIRow>() {
-            @Override
-            public Iterator<IIRow> iterator() {
-                return new Iterator<IIRow>() {
-                    ImmutableBytesWritable k = new ImmutableBytesWritable();
-                    ImmutableBytesWritable v = new ImmutableBytesWritable();
-                    IIRow pair = new IIRow(k, v, null);
-
-                    @Override
-                    public boolean hasNext() {
-                        boolean hasNext = false;
-                        try {
-                            hasNext = reader.next(k, v);
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        } finally {
-                            if (hasNext == false) {
-                                IOUtils.closeQuietly(reader);
-                            }
-                        }
-                        return hasNext;
-                    }
-
-                    @Override
-                    public IIRow next() {
-                        return pair;
-                    }
-
-                    @Override
-                    public void remove() {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
deleted file mode 100644
index d7eb3cf..0000000
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class BuildCubeWithEngineTest {
-
-    private CubeManager cubeManager;
-    private DefaultScheduler scheduler;
-    protected ExecutableManager jobService;
-
-    private static final Log logger = LogFactory.getLog(BuildCubeWithEngineTest.class);
-
-    protected void waitForJob(String jobId) {
-        while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
-                break;
-            } else {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
-        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.getInstance();
-        scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
-        if (!scheduler.hasStarted()) {
-            throw new RuntimeException("scheduler has not been started");
-        }
-        cubeManager = CubeManager.getInstance(kylinConfig);
-        for (String jobId : jobService.getAllJobIds()) {
-            if (jobService.getJob(jobId) instanceof CubingJob) {
-                jobService.deleteJob(jobId);
-            }
-        }
-
-    }
-
-    @After
-    public void after() {
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws Exception {
-        DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty");
-        testInner();
-        testLeft();
-    }
-
-    private void testInner() throws Exception {
-        String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", };
-        runTestAndAssertSucceed(testCase);
-    }
-
-    private void testLeft() throws Exception {
-        String[] testCase = new String[] { "testLeftJoinCube", "testLeftJoinCube2", };
-        runTestAndAssertSucceed(testCase);
-    }
-
-    private void runTestAndAssertSucceed(String[] testCase) throws Exception {
-        ExecutorService executorService = Executors.newFixedThreadPool(testCase.length);
-        final CountDownLatch countDownLatch = new CountDownLatch(testCase.length);
-        List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length);
-        for (int i = 0; i < testCase.length; i++) {
-            tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch)));
-        }
-        countDownLatch.await();
-        try {
-            for (int i = 0; i < tasks.size(); ++i) {
-                Future<List<String>> task = tasks.get(i);
-                final List<String> jobIds = task.get();
-                for (String jobId : jobIds) {
-                    assertJobSucceed(jobId);
-                }
-            }
-        } catch (Exception ex) {
-            logger.error(ex);
-            throw ex;
-        }
-    }
-
-    private void assertJobSucceed(String jobId) {
-        assertEquals("The job '" + jobId + "' is failed.", ExecutableState.SUCCEED, jobService.getOutput(jobId).getState());
-    }
-
-    private class TestCallable implements Callable<List<String>> {
-
-        private final String methodName;
-        private final CountDownLatch countDownLatch;
-
-        public TestCallable(String methodName, CountDownLatch countDownLatch) {
-            this.methodName = methodName;
-            this.countDownLatch = countDownLatch;
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public List<String> call() throws Exception {
-            try {
-                final Method method = BuildCubeWithEngineTest.class.getDeclaredMethod(methodName);
-                method.setAccessible(true);
-                return (List<String>) method.invoke(BuildCubeWithEngineTest.this);
-            } finally {
-                countDownLatch.countDown();
-            }
-        }
-    }
-
-    @SuppressWarnings("unused")
-    // called by reflection
-    private List<String> testInnerJoinCube2() throws Exception {
-        clearSegment("test_kylin_cube_with_slr_empty");
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-        long date1 = 0;
-        long date2 = f.parse("2013-01-01").getTime();
-        long date3 = f.parse("2022-01-01").getTime();
-        List<String> result = Lists.newArrayList();
-        result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2));
-        result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));
-        return result;
-    }
-
-    @SuppressWarnings("unused")
-    // called by reflection
-    private List<String> testInnerJoinCube() throws Exception {
-        clearSegment("test_kylin_cube_without_slr_empty");
-
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-        // this cube's start date is 0, end date is 20501112000000
-        long date1 = 0;
-        long date2 = f.parse("2050-01-11").getTime();
-
-        // this cube doesn't support incremental build, always do full build
-
-        List<String> result = Lists.newArrayList();
-        result.add(buildSegment("test_kylin_cube_without_slr_empty", date1, date2));
-        return result;
-    }
-
-    @SuppressWarnings("unused")
-    // called by reflection
-    private List<String> testLeftJoinCube2() throws Exception {
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-        List<String> result = Lists.newArrayList();
-        final String cubeName = "test_kylin_cube_without_slr_left_join_empty";
-        // this cube's start date is 0, end date is 20120601000000
-        long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
-        long dateEnd = f.parse("2012-06-01").getTime();
-
-        clearSegment(cubeName);
-        result.add(buildSegment(cubeName, dateStart, dateEnd));
-
-        // then submit an append job, start date is 20120601000000, end
-        // date is 20220101000000
-        dateStart = f.parse("2012-06-01").getTime();
-        dateEnd = f.parse("2022-01-01").getTime();
-        result.add(buildSegment(cubeName, dateStart, dateEnd));
-
-        // build an empty segment which doesn't have data
-        dateStart = f.parse("2022-01-01").getTime();
-        dateEnd = f.parse("2023-01-01").getTime();
-        result.add(buildSegment(cubeName, dateStart, dateEnd));
-
-        return result;
-
-    }
-
-    @SuppressWarnings("unused")
-    // called by reflection
-    private List<String> testLeftJoinCube() throws Exception {
-        String cubeName = "test_kylin_cube_with_slr_left_join_empty";
-        clearSegment(cubeName);
-
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-        long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
-        long dateEnd = f.parse("2050-11-12").getTime();
-
-        // this cube's start date is 0, end date is 20501112000000
-        List<String> result = Lists.newArrayList();
-        result.add(buildSegment(cubeName, dateStart, dateEnd));
-        return result;
-
-    }
-
-    private void clearSegment(String cubeName) throws Exception {
-        CubeInstance cube = cubeManager.getCube(cubeName);
-        // remove all existing segments
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
-        cubeManager.updateCube(cubeBuilder);
-    }
-
-    private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {
-        CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube(cubeName), endDate);
-        DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
-        jobService.addJob(job);
-        waitForJob(job.getId());
-        return job.getId();
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
deleted file mode 100644
index b02b2f2..0000000
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.job.streaming.BootstrapConfig;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
-import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.StreamingManager;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *  for streaming cubing case "test_streaming_table"
- */
-public class BuildCubeWithStreamTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamTest.class);
-    private static final String streamingName = "test_streaming_table_cube";
-    private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00");
-    private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00");
-    private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours
-
-    private KylinConfig kylinConfig;
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
-        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-        DeployUtil.overrideJobJarLocations();
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-
-        //Use a random toplic for kafka data stream
-        StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingName);
-        streamingConfig.setTopic(UUID.randomUUID().toString());
-        StreamingManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
-
-        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, streamingConfig);
-    }
-
-    @AfterClass
-    public static void afterClass() throws Exception {
-        backup();
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
-    }
-
-    private static int cleanupOldStorage() throws Exception {
-        String[] args = { "--delete", "true" };
-        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
-        return exitCode;
-    }
-
-    private static void backup() throws Exception {
-        int exitCode = cleanupOldStorage();
-        if (exitCode == 0) {
-            exportHBaseData();
-        }
-    }
-
-    private static void exportHBaseData() throws IOException {
-        ExportHBaseData export = new ExportHBaseData();
-        export.exportTables();
-        export.tearDown();
-    }
-
-    @Test
-    public void test() throws Exception {
-        for (long start = startTime; start < endTime; start += batchInterval) {
-            BootstrapConfig bootstrapConfig = new BootstrapConfig();
-            bootstrapConfig.setStart(start);
-            bootstrapConfig.setEnd(start + batchInterval);
-            bootstrapConfig.setOneOff(true);
-            bootstrapConfig.setPartitionId(0);
-            bootstrapConfig.setStreaming(streamingName);
-            StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(bootstrapConfig);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
deleted file mode 100644
index fecb106..0000000
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.invertedindex.IIJob;
-import org.apache.kylin.job.invertedindex.IIJobBuilder;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author shaoshi
- */
-public class BuildIIWithEngineTest {
-
-    private JobEngineConfig jobEngineConfig;
-    private IIManager iiManager;
-
-    private DefaultScheduler scheduler;
-    protected ExecutableManager jobService;
-
-    protected static final String[] TEST_II_INSTANCES = new String[] { "test_kylin_ii_inner_join", "test_kylin_ii_left_join" };
-
-    private static final Log logger = LogFactory.getLog(BuildIIWithEngineTest.class);
-
-    protected void waitForJob(String jobId) {
-        while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
-                break;
-            } else {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        //DeployUtil.initCliWorkDir();
-        //        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.getInstance();
-        scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
-        if (!scheduler.hasStarted()) {
-            throw new RuntimeException("scheduler has not been started");
-        }
-        jobEngineConfig = new JobEngineConfig(kylinConfig);
-        for (String jobId : jobService.getAllJobIds()) {
-            if (jobService.getJob(jobId) instanceof IIJob) {
-                jobService.deleteJob(jobId);
-            }
-        }
-
-        iiManager = IIManager.getInstance(kylinConfig);
-        for (String iiInstance : TEST_II_INSTANCES) {
-
-            IIInstance ii = iiManager.getII(iiInstance);
-            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
-                ii.setStatus(RealizationStatusEnum.DISABLED);
-                iiManager.updateII(ii);
-            }
-        }
-    }
-
-    @After
-    public void after() throws Exception {
-
-        for (String iiInstance : TEST_II_INSTANCES) {
-            IIInstance ii = iiManager.getII(iiInstance);
-            if (ii.getStatus() != RealizationStatusEnum.READY) {
-                ii.setStatus(RealizationStatusEnum.READY);
-                iiManager.updateII(ii);
-            }
-        }
-    }
-
-    @Test
-    @Ignore
-    public void testBuildII() throws Exception {
-
-        String[] testCase = new String[] { "buildIIInnerJoin", "buildIILeftJoin" };
-        ExecutorService executorService = Executors.newFixedThreadPool(testCase.length);
-        final CountDownLatch countDownLatch = new CountDownLatch(testCase.length);
-        List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length);
-        for (int i = 0; i < testCase.length; i++) {
-            tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch)));
-        }
-        countDownLatch.await();
-        for (int i = 0; i < tasks.size(); ++i) {
-            Future<List<String>> task = tasks.get(i);
-            final List<String> jobIds = task.get();
-            for (String jobId : jobIds) {
-                assertJobSucceed(jobId);
-            }
-        }
-
-    }
-
-    private void assertJobSucceed(String jobId) {
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(jobId).getState());
-    }
-
-    private class TestCallable implements Callable<List<String>> {
-
-        private final String methodName;
-        private final CountDownLatch countDownLatch;
-
-        public TestCallable(String methodName, CountDownLatch countDownLatch) {
-            this.methodName = methodName;
-            this.countDownLatch = countDownLatch;
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public List<String> call() throws Exception {
-            try {
-                final Method method = BuildIIWithEngineTest.class.getDeclaredMethod(methodName);
-                method.setAccessible(true);
-                return (List<String>) method.invoke(BuildIIWithEngineTest.this);
-            } finally {
-                countDownLatch.countDown();
-            }
-        }
-    }
-
-    protected List<String> buildIIInnerJoin() throws Exception {
-        return buildII(TEST_II_INSTANCES[0]);
-    }
-
-    protected List<String> buildIILeftJoin() throws Exception {
-        return buildII(TEST_II_INSTANCES[1]);
-    }
-
-    protected List<String> buildII(String iiName) throws Exception {
-        clearSegment(iiName);
-
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-        long date1 = 0;
-        long date2 = f.parse("2015-01-01").getTime();
-
-        List<String> result = Lists.newArrayList();
-        result.add(buildSegment(iiName, date1, date2));
-        return result;
-    }
-
-    private void clearSegment(String iiName) throws Exception {
-        IIInstance ii = iiManager.getII(iiName);
-        ii.getSegments().clear();
-        iiManager.updateII(ii);
-    }
-
-    private String buildSegment(String iiName, long startDate, long endDate) throws Exception {
-        IIInstance iiInstance = iiManager.getII(iiName);
-        IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
-        iiInstance.getSegments().add(segment);
-        iiManager.updateII(iiInstance);
-        IIJobBuilder iiJobBuilder = new IIJobBuilder(jobEngineConfig);
-        IIJob job = iiJobBuilder.buildJob(segment, "TEST");
-        jobService.addJob(job);
-        waitForJob(job.getId());
-        return job.getId();
-    }
-
-    private int cleanupOldStorage() throws Exception {
-        String[] args = { "--delete", "true" };
-
-        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
-        return exitCode;
-    }
-
-    public static void main(String[] args) throws Exception {
-        BuildIIWithEngineTest instance = new BuildIIWithEngineTest();
-
-        BuildIIWithEngineTest.beforeClass();
-        instance.before();
-        instance.testBuildII();
-        instance.after();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
deleted file mode 100644
index 5ca3b29..0000000
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.job.common.ShellExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.source.hive.HiveTableReader;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class BuildIIWithStreamTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStreamTest.class);
-
-    private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
-    private IIManager iiManager;
-    private KylinConfig kylinConfig;
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-        DeployUtil.overrideJobJarLocations();
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        iiManager = IIManager.getInstance(kylinConfig);
-        iiManager = IIManager.getInstance(kylinConfig);
-        for (String iiInstance : II_NAME) {
-
-            IIInstance ii = iiManager.getII(iiInstance);
-            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
-                ii.setStatus(RealizationStatusEnum.DISABLED);
-                iiManager.updateII(ii);
-            }
-        }
-    }
-
-    private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException {
-        IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
-        JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
-        final String uuid = UUID.randomUUID().toString();
-        final String useDatabaseHql = "USE " + kylinConfig.getHiveDatabaseForIntermediateTable() + ";";
-        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
-        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid));
-        String insertDataHqls;
-        try {
-            insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobEngineConfig);
-        } catch (IOException e1) {
-            e1.printStackTrace();
-            throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
-        }
-
-        ShellExecutable step = new ShellExecutable();
-        StringBuffer buf = new StringBuffer();
-        buf.append("hive -e \"");
-        buf.append(useDatabaseHql + "\n");
-        buf.append(dropTableHql + "\n");
-        buf.append(createTableHql + "\n");
-        buf.append(insertDataHqls + "\n");
-        buf.append("\"");
-
-        step.setCmd(buf.toString());
-        logger.info(step.getCmd());
-        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-        kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
-        return intermediateTableDesc.getTableName();
-    }
-
-    private void clearSegment(String iiName) throws Exception {
-        IIInstance ii = iiManager.getII(iiName);
-        ii.getSegments().clear();
-        iiManager.updateII(ii);
-    }
-
-    private IISegment createSegment(String iiName) throws Exception {
-        clearSegment(iiName);
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-        long date1 = 0;
-        long date2 = f.parse("2015-01-01").getTime();
-        return buildSegment(iiName, date1, date2);
-    }
-
-    private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception {
-        IIInstance iiInstance = iiManager.getII(iiName);
-        IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
-        iiInstance.getSegments().add(segment);
-        iiManager.updateII(iiInstance);
-        return segment;
-    }
-
-    private void buildII(String iiName) throws Exception {
-        final IIDesc desc = iiManager.getII(iiName).getDescriptor();
-        final String tableName = createIntermediateTable(desc, kylinConfig);
-        logger.info("intermediate table name:" + tableName);
-
-        HiveTableReader reader = new HiveTableReader("default", tableName);
-        final List<TblColRef> tblColRefs = desc.listAllColumns();
-        for (TblColRef tblColRef : tblColRefs) {
-            if (desc.isMetricsCol(tblColRef)) {
-                logger.info("matrix:" + tblColRef.getName());
-            } else {
-                logger.info("measure:" + tblColRef.getName());
-            }
-        }
-        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>();
-        final IISegment segment = createSegment(iiName);
-        String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
-        ToolRunner.run(new IICreateHTableJob(), args);
-
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final StreamBuilder streamBuilder = StreamBuilder.newLimitedSizeStreamBuilder(iiName, queue, new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0), 0, segment.getIIDesc().getSliceSize());
-
-        List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
-        int count = sorted.size();
-        for (String[] row : sorted) {
-            logger.info("another row: " + StringUtils.join(row, ","));
-            queue.put(parse(row));
-        }
-
-        reader.close();
-        logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
-        queue.put(StreamMessage.EOF);
-        final Future<?> future = executorService.submit(streamBuilder);
-        try {
-            future.get();
-        } catch (Exception e) {
-            logger.error("stream build failed", e);
-            fail("stream build failed");
-        }
-
-        logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
-    }
-
-    @Test
-    public void test() throws Exception {
-        for (String iiName : II_NAME) {
-            buildII(iiName);
-            IIInstance ii = iiManager.getII(iiName);
-            if (ii.getStatus() != RealizationStatusEnum.READY) {
-                ii.setStatus(RealizationStatusEnum.READY);
-                iiManager.updateII(ii);
-            }
-        }
-    }
-
-    private StreamMessage parse(String[] row) {
-        return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
-    }
-
-    private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {
-        List<String[]> unsorted = Lists.newArrayList();
-        while (reader.next()) {
-            unsorted.add(reader.getRow());
-        }
-        Collections.sort(unsorted, new Comparator<String[]>() {
-            @Override
-            public int compare(String[] o1, String[] o2) {
-                long t1 = DateFormat.stringToMillis(o1[tsCol]);
-                long t2 = DateFormat.stringToMillis(o2[tsCol]);
-                return Long.compare(t1, t2);
-            }
-        });
-        return unsorted;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DataGenTest.java b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
deleted file mode 100644
index 5c01305..0000000
--- a/job/src/test/java/org/apache/kylin/job/DataGenTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.job.dataGen.FactTableGenerator;
-import org.apache.kylin.metadata.MetadataManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- *
- */
-public class DataGenTest extends LocalFileMetadataTestCase {
-
-    @Before
-    public void before() throws Exception {
-        this.createTestMetadata();
-        MetadataManager.clearCache();
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    @Test
-    public void testBasics() throws Exception {
-        String content = FactTableGenerator.generate("test_kylin_cube_with_slr_ready", "10000", "1", null);// default  settings
-        System.out.println(content);
-        assertTrue(content.contains("FP-non GTC"));
-        assertTrue(content.contains("ABIN"));
-
-        DeployUtil.overrideFactTableData(content, "default.test_kylin_fact");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java b/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
deleted file mode 100644
index 7f12069..0000000
--- a/job/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.io.File;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * This test case is ONLY for dev use, it deploys local meta to sandbox
- */
-@Ignore("dev use only")
-public class DeployLocalMetaToRemoteTest {
-
-    private static final Log logger = LogFactory.getLog(DeployLocalMetaToRemoteTest.class);
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
-        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-
-    }
-
-    @After
-    public void after() {
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws Exception {
-        System.out.println("blank");
-    }
-
-}
\ No newline at end of file