You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/06 09:28:36 UTC

[2/2] incubator-kylin git commit: KYLIN-1116 Use local dictionary for InvertedIndex batch building

KYLIN-1116 Use local dictionary for InvertedIndex batch building

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/82bfa924
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/82bfa924
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/82bfa924

Branch: refs/heads/KYLIN-1116
Commit: 82bfa924c48db87297be6a7720357b500d48511e
Parents: f8590d2
Author: shaofengshi <sh...@apache.org>
Authored: Fri Nov 6 16:24:29 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 6 16:24:29 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/BuildIIWithStreamTest.java |  15 +-
 .../kylin/job/hadoop/invertedindex/IITest.java  |  11 +-
 .../kylin/common/util/StreamingBatch.java       |  59 +++++
 .../kylin/common/util/StreamingMessage.java     |  43 ++++
 .../realization/IRealizationSegment.java        |   1 -
 .../kylin/engine/mr/JobBuilderSupport.java      |   8 +-
 .../engine/mr/common/AbstractHadoopJob.java     |   3 -
 .../mr/invertedindex/BatchIIJobBuilder.java     |  61 +-----
 .../CreateInvertedIndexDictionaryJob.java       |  70 ------
 .../IIDistinctColumnsCombiner.java              |  58 -----
 .../mr/invertedindex/IIDistinctColumnsJob.java  | 138 ------------
 .../invertedindex/IIDistinctColumnsMapper.java  |  75 -------
 .../invertedindex/IIDistinctColumnsReducer.java |  77 -------
 .../engine/mr/invertedindex/IIJobBuilder.java   | 219 -------------------
 .../mr/invertedindex/InvertedIndexJob.java      |  10 +-
 .../mr/invertedindex/InvertedIndexMapper.java   |  32 +--
 .../mr/invertedindex/InvertedIndexReducer.java  |  89 +++++---
 .../UpdateIIInfoAfterBuildStep.java             |  84 +++++++
 .../UpdateInvertedIndexInfoAfterBuildStep.java  |  93 --------
 .../kylin/engine/streaming/IStreamingInput.java |   2 +
 .../streaming/OneOffStreamingBuilder.java       |   1 +
 .../kylin/engine/streaming/StreamingBatch.java  |  61 ------
 .../engine/streaming/StreamingBatchBuilder.java |   1 +
 .../engine/streaming/StreamingMessage.java      |  43 ----
 .../streaming/cube/StreamingCubeBuilder.java    |   4 +-
 .../streaming/invertedindex/SliceBuilder.java   |  81 -------
 .../invertedindex/test_kylin_ii_inner_join.json |  45 +---
 .../invertedindex/test_kylin_ii_left_join.json  |  45 +---
 .../invertedindex/test_streaming_table_ii.json  |  22 +-
 .../apache/kylin/invertedindex/IIManager.java   |  50 +----
 .../apache/kylin/invertedindex/IISegment.java   |  51 +----
 .../kylin/invertedindex/index/SliceBuilder.java |  77 +++++++
 .../invertedindex/IIInstanceTest.java           |   5 -
 .../apache/kylin/source/hive/HiveMRInput.java   |   3 +
 .../kylin/source/kafka/KafkaStreamingInput.java |   4 +-
 .../kylin/source/kafka/StreamingParser.java     |   2 +-
 .../source/kafka/StringStreamingParser.java     |   2 +-
 .../source/kafka/TimedJsonStreamParser.java     |   2 +-
 .../kafka/diagnose/KafkaInputAnalyzer.java      |   2 +-
 .../kylin/source/kafka/util/KafkaUtils.java     |   2 +-
 .../storage/hbase/ii/IICreateHFileMapper.java   |  19 +-
 .../storage/hbase/ii/IICreateHTableJob.java     |  31 ++-
 .../storage/hbase/steps/HBaseMROutput.java      |   2 +-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  18 +-
 .../hbase/util/DeployCoprocessorCLI.java        |   3 +-
 .../storage/hbase/util/StorageCleanupJob.java   |   2 -
 .../hbase/common/TsConditionEraserTest.java     |   6 +-
 .../endpoint/EndpointAggregationTest.java       |  27 ++-
 .../endpoint/TableRecordInfoTest.java           |   5 +
 .../endpoint/TsConditionExtractorTest.java      |   5 +
 50 files changed, 469 insertions(+), 1300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index b64a7c5..2d40b09 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -51,8 +51,8 @@ import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
@@ -61,7 +61,7 @@ import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder;
+import org.apache.kylin.invertedindex.index.SliceBuilder;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -196,14 +196,13 @@ public class BuildIIWithStreamTest {
         int count = sorted.size();
         ArrayList<StreamingMessage> messages = Lists.newArrayList();
         for (String[] row : sorted) {
-            if (messages.size() < iiDesc.getSliceSize()) {
-                messages.add(parse(row));
-            } else {
+            messages.add((parse(row)));
+            if (messages.size() >= iiDesc.getSliceSize()) {
                 build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
-                messages = Lists.newArrayList();
-                messages.add((parse(row)));
+                messages.clear();
             }
         }
+        
         if (!messages.isEmpty()) {
             build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 200156a..a393ce3 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -23,10 +23,11 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.util.FIFOIterable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
@@ -35,7 +36,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
 import org.apache.kylin.invertedindex.model.IIRow;
 import org.apache.kylin.invertedindex.model.KeyValueCodec;
-import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder;
+import org.apache.kylin.invertedindex.index.SliceBuilder;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -146,6 +147,10 @@ public class IITest extends LocalFileMetadataTestCase {
     @Test
     public void IIEndpointTest() {
         TableRecordInfo info = new TableRecordInfo(ii.getDescriptor());
+        if (ii.getFirstSegment() == null) {
+            IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+            ii.getSegments().add(segment);
+        }
         CoprocessorRowType type = CoprocessorRowType.fromTableRecordInfo(info, ii.getFirstSegment().getColumns());
         CoprocessorProjector projector = CoprocessorProjector.makeForEndpoint(info, Collections.singletonList(ii.getDescriptor().findColumnRef("default.test_kylin_fact", "lstg_format_name")));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
new file mode 100644
index 0000000..ae5b72f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
@@ -0,0 +1,59 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+package org.apache.kylin.common.util;
+
+import java.util.List;
+
+/**
+ */
+public final class StreamingBatch {
+
+    private final List<StreamingMessage> messages;
+
+    private final Pair<Long, Long> timeRange;
+
+    public StreamingBatch(List<StreamingMessage> messages, Pair<Long, Long> timeRange) {
+        this.messages = messages;
+        this.timeRange = timeRange;
+    }
+
+    public List<StreamingMessage> getMessages() {
+        return messages;
+    }
+
+    public Pair<Long, Long> getTimeRange() {
+        return timeRange;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
new file mode 100644
index 0000000..2c150ff
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
@@ -0,0 +1,43 @@
+package org.apache.kylin.common.util;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class StreamingMessage {
+
+    private final List<String> data;
+
+    private long offset;
+
+    private long timestamp;
+
+    private Map<String, Object> params;
+
+    public static final StreamingMessage EOF = new StreamingMessage(Collections.<String> emptyList(), 0L, 0L, Collections.<String, Object> emptyMap());
+
+    public StreamingMessage(List<String> data, long offset, long timestamp, Map<String, Object> params) {
+        this.data = data;
+        this.offset = offset;
+        this.timestamp = timestamp;
+        this.params = params;
+    }
+
+    public final List<String> getData() {
+        return data;
+    }
+
+    public final long getOffset() {
+        return offset;
+    }
+
+    public final long getTimestamp() {
+        return timestamp;
+    }
+
+    public Map<String, Object> getParams() {
+        return params;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
index afab86b..3ac82a0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
@@ -4,7 +4,6 @@ import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 
 /**
- * Created by shaoshi on 10/30/15.
  */
 public interface IRealizationSegment extends IBuildable {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 8c770f9..e3b07d8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -25,13 +25,12 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.invertedindex.UpdateInvertedIndexInfoAfterBuildStep;
+import org.apache.kylin.engine.mr.invertedindex.UpdateIIInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
 import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
 import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
-import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 
@@ -125,11 +124,10 @@ public class JobBuilderSupport {
 
 
 
-    public UpdateInvertedIndexInfoAfterBuildStep createUpdateInvertedIndexInfoAfterBuildStep(String jobId) {
-        final UpdateInvertedIndexInfoAfterBuildStep updateIIInfoStep = new UpdateInvertedIndexInfoAfterBuildStep();
+    public UpdateIIInfoAfterBuildStep createUpdateIIInfoAfterBuildStep(String jobId) {
+        final UpdateIIInfoAfterBuildStep updateIIInfoStep = new UpdateIIInfoAfterBuildStep();
         updateIIInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_II_INFO);
         updateIIInfoStep.setInvertedIndexName(seg.getRealization().getName());
-        updateIIInfoStep.setSegmentId(seg.getUuid());
         updateIIInfoStep.setJobId(jobId);
         return updateIIInfoStep;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 8782bbe..366a730 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -337,9 +337,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             TableDesc table = metaMgr.getTableDesc(tableName);
             dumpList.add(table.getResourcePath());
         }
-        for (IISegment segment : ii.getSegments()) {
-            dumpList.addAll(segment.getDictionaryPaths());
-        }
 
         attachKylinPropsAndMetadata(dumpList, conf);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
index 97e27d0..e7501b8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
@@ -18,30 +18,16 @@
 
 package org.apache.kylin.engine.mr.invertedindex;
 
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput;
-import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
-import org.apache.kylin.engine.mr.steps.NDCuboidJob;
 import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.IRealizationSegment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 public class BatchIIJobBuilder extends JobBuilderSupport {
     
     private static final Logger logger = LoggerFactory.getLogger(BatchIIJobBuilder.class);
@@ -64,54 +50,20 @@ public class BatchIIJobBuilder extends JobBuilderSupport {
         final String iiRootPath = getRealizationRootPath(jobId) + "/";
         // Phase 1: Create Flat Table
         inputSide.addStepPhase1_CreateFlatTable(result);
-
-        final String intermediateTableIdentity = seg.getJoinedFlatTableDesc().getTableName();
-        // Phase 2: Build Dictionary
-        result.addTask(createIIFactDistinctColumnsStep(seg, intermediateTableIdentity, getFactDistinctColumnsPath(jobId)));
-        result.addTask(createIIBuildDictionaryStep(seg, getFactDistinctColumnsPath(jobId)));
-
-        // Phase 3: Build Cube
-        result.addTask(createInvertedIndexStep((IISegment)seg, intermediateTableIdentity, iiRootPath));
+    
+        // Phase 2: Build Inverted Index
+        result.addTask(createInvertedIndexStep((IISegment)seg, iiRootPath));
         outputSide.addStepPhase3_BuildII(result, iiRootPath);
 
-        // Phase 4: Update Metadata & Cleanup
-        result.addTask(createUpdateInvertedIndexInfoAfterBuildStep(jobId));
+        // Phase 3: Update Metadata & Cleanup
+        result.addTask(createUpdateIIInfoAfterBuildStep(jobId));
         inputSide.addStepPhase4_Cleanup(result);
         outputSide.addStepPhase4_Cleanup(result);
 
         return result;
     }
 
-    private MapReduceExecutable createIIFactDistinctColumnsStep(IRealizationSegment seg, String factTableName, String output) {
-        MapReduceExecutable result = new MapReduceExecutable();
-        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        result.setMapReduceJobClass(IIDistinctColumnsJob.class);
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
-        appendExecCmdParameters(cmd, "tablename", factTableName);
-        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "output", output);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
-
-        result.setMapReduceParams(cmd.toString());
-        return result;
-    }
-
-    private HadoopShellExecutable createIIBuildDictionaryStep(IRealizationSegment seg, String factDistinctColumnsPath) {
-        // base cuboid job
-        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-
-        buildDictionaryStep.setJobParams(cmd.toString());
-        buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
-        return buildDictionaryStep;
-    }
-
-    private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
-        // base cuboid job
+    private MapReduceExecutable createInvertedIndexStep(IISegment seg, String iiOutputTempPath) {
         MapReduceExecutable buildIIStep = new MapReduceExecutable();
 
         StringBuilder cmd = new StringBuilder();
@@ -120,7 +72,6 @@ public class BatchIIJobBuilder extends JobBuilderSupport {
         buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
 
         appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
         appendExecCmdParameters(cmd, "output", iiOutputTempPath);
         appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
deleted file mode 100644
index 39d74b4..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.engine.mr.DFSFileTable;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
-
-/**
- */
-public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob {
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_II_NAME);
-            options.addOption(OPTION_INPUT_PATH);
-            parseOptions(options, args);
-
-            final String iiname = getOptionValue(OPTION_II_NAME);
-            final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
-            final KylinConfig config = KylinConfig.getInstanceFromEnv();
-
-            IIManager mgr = IIManager.getInstance(config);
-            IIInstance ii = mgr.getII(iiname);
-
-            mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), new DistinctColumnValuesProvider() {
-                @Override
-                public ReadableTable getDistinctValuesFor(TblColRef col) {
-                    return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
-                }
-            });
-            return 0;
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
deleted file mode 100644
index 651ad63..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.KylinReducer;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> {
-
-    private Text outputValue = new Text();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-    }
-
-    @Override
-    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
-        HashSet<ByteArray> set = new HashSet<ByteArray>();
-        for (Text textValue : values) {
-            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
-            set.add(value);
-        }
-
-        for (ByteArray value : set) {
-            outputValue.set(value.array(), value.offset(), value.length());
-            context.write(key, outputValue);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
deleted file mode 100644
index fe968b1..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsJob extends AbstractHadoopJob {
-    protected static final Logger logger = LoggerFactory.getLogger(IIDistinctColumnsJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_TABLE_NAME);
-            options.addOption(OPTION_II_NAME);
-            options.addOption(OPTION_OUTPUT_PATH);
-            parseOptions(options, args);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase();
-            String iiName = getOptionValue(OPTION_II_NAME);
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
-            // ----------------------------------------------------------------------------
-
-            logger.info("Starting: " + job.getJobName() + " on table " + tableName);
-
-            IIManager iiMgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-            IIInstance ii = iiMgr.getII(iiName);
-            job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
-            job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii));
-
-            setJobClasspath(job);
-
-            
-            setupMapper(ii.getFirstSegment());
-            setupReducer(output);
-
-            Configuration conf = job.getConfiguration();
-            conf.set(BatchConstants.CFG_II_NAME, ii.getName());
-            attachKylinPropsAndMetadata(ii, conf);
-            return waitForCompletion(job);
-
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-
-    }
-
-    private String getColumns(IIInstance ii) {
-        IIJoinedFlatTableDesc iiflat = new IIJoinedFlatTableDesc(ii.getDescriptor());
-        StringBuilder buf = new StringBuilder();
-        for (IntermediateColumnDesc col : iiflat.getColumnList()) {
-            if (buf.length() > 0)
-                buf.append(",");
-            buf.append(col.getColumnName());
-        }
-        return buf.toString();
-    }
-
-    private void setupMapper(IISegment segment) throws IOException {
-        
-        IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
-        flatTableInputFormat.configureJob(job);
-
-        job.setMapperClass(IIDistinctColumnsMapper.class);
-        job.setCombinerClass(IIDistinctColumnsCombiner.class);
-        job.setMapOutputKeyClass(ShortWritable.class);
-        job.setMapOutputValueClass(Text.class);
-    }
-
-    private void setupReducer(Path output) throws IOException {
-        job.setReducerClass(IIDistinctColumnsReducer.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setOutputKeyClass(NullWritable.class);
-        job.setOutputValueClass(Text.class);
-
-        FileOutputFormat.setOutputPath(job, output);
-        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
-        job.setNumReduceTasks(1);
-
-        deletePath(job.getConfiguration(), output);
-    }
-
-    public static void main(String[] args) throws Exception {
-        IIDistinctColumnsJob job = new IIDistinctColumnsJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
deleted file mode 100644
index c431ecd..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, Object, ShortWritable, Text> {
-
-    private ShortWritable outputKey = new ShortWritable();
-    private Text outputValue = new Text();
-
-    protected IMRInput.IMRTableInputFormat flatTableInputFormat;
-    
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-        Configuration conf = context.getConfiguration();
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        String iiName = conf.get(BatchConstants.CFG_II_NAME);
-        IIInstance iiInstance = IIManager.getInstance(config).getII(iiName);
-        IISegment seg = iiInstance.getFirstSegment();
-        flatTableInputFormat = MRUtil.getBatchCubingInputSide(seg).getFlatTableInputFormat();
-    }
-
-    @Override
-    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
-
-        String[] row = flatTableInputFormat.parseMapperInput(record);
-        
-        for (short i = 0; i < row.length; i++) {
-            outputKey.set(i);
-            if (row[i] == null)
-                continue;
-            byte[] bytes = Bytes.toBytes(row[i].toString());
-            outputValue.set(bytes, 0, bytes.length);
-            context.write(outputKey, outputValue);
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
deleted file mode 100644
index d50385f..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
-
-    private String[] columns;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-        this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
-    }
-
-    @Override
-    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-        String columnName = columns[key.get()];
-
-        HashSet<ByteArray> set = new HashSet<ByteArray>();
-        for (Text textValue : values) {
-            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
-            set.add(value);
-        }
-
-        Configuration conf = context.getConfiguration();
-        FileSystem fs = FileSystem.get(conf);
-        String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
-        FSDataOutputStream out = fs.create(new Path(outputPath, columnName));
-
-        try {
-            for (ByteArray value : set) {
-                out.write(value.array(), value.offset(), value.length());
-                out.write('\n');
-            }
-        } finally {
-            out.close();
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
deleted file mode 100644
index 18d3001..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
+++ /dev/null
@@ -1,219 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// * 
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// * 
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-//*/
-//
-//package org.apache.kylin.engine.mr.invertedindex;
-//
-//import java.io.IOException;
-//import java.text.SimpleDateFormat;
-//import java.util.Date;
-//import java.util.TimeZone;
-//
-//import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
-//import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-//import org.apache.kylin.invertedindex.IISegment;
-//import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-//import org.apache.kylin.job.constant.ExecutableConstants;
-//import org.apache.kylin.job.engine.JobEngineConfig;
-//import org.apache.kylin.job.execution.AbstractExecutable;
-//import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-//
-//import com.google.common.base.Preconditions;
-//
-///**
-// */
-//public final class IIJobBuilder {
-//
-//    final JobEngineConfig engineConfig;
-//
-//    public IIJobBuilder(JobEngineConfig engineConfig) {
-//        this.engineConfig = engineConfig;
-//    }
-//
-//    public IIJob buildJob(IISegment seg, String submitter) {
-//        checkPreconditions(seg);
-//
-//        IIJob result = initialJob(seg, "BUILD", submitter);
-//        final String jobId = result.getId();
-//        final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
-//        final String intermediateTableIdentity = getIntermediateTableIdentity(intermediateTableDesc);
-//        final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
-//        final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
-//        final String iiPath = iiRootPath + "*";
-//
-//        final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId);
-//        result.addTask(intermediateHiveTableStep);
-//
-//        result.addTask(createFactDistinctColumnsStep(seg, intermediateTableIdentity, jobId, factDistinctColumnsPath));
-//
-//        result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
-//
-//        result.addTask(createInvertedIndexStep(seg, intermediateTableIdentity, iiRootPath));
-//
-//        // create htable step
-//        result.addTask(createCreateHTableStep(seg));
-//
-//        // generate hfiles step
-//        result.addTask(createConvertToHfileStep(seg, iiPath, jobId));
-//
-//        // bulk load step
-//        result.addTask(createBulkLoadStep(seg, jobId));
-//
-//        return result;
-//    }
-//
-//    private IIJob initialJob(IISegment seg, String type, String submitter) {
-//        IIJob result = new IIJob();
-//        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
-//        format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
-//        result.setIIName(seg.getIIInstance().getName());
-//        result.setSegmentId(seg.getUuid());
-//        result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
-//        result.setSubmitter(submitter);
-//        return result;
-//    }
-//
-//    private void checkPreconditions(IISegment seg) {
-//        Preconditions.checkNotNull(seg, "segment cannot be null");
-//        Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
-//    }
-//
-//    private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) {
-//        try {
-//            String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM);
-//            if (jobConf != null && jobConf.length() > 0) {
-//                builder.append(" -conf ").append(jobConf);
-//            }
-//        } catch (IOException e) {
-//            throw new RuntimeException(e);
-//        }
-//    }
-//
-//    private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) {
-//        return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns";
-//    }
-//
-//    private String getHFilePath(IISegment seg, String jobId) {
-//        return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/";
-//    }
-//
-//    private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) {
-//        MapReduceExecutable result = new MapReduceExecutable();
-//        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-//        result.setMapReduceJobClass(IIDistinctColumnsJob.class);
-//        StringBuilder cmd = new StringBuilder();
-//        appendMapReduceParameters(cmd, engineConfig);
-//        appendExecCmdParameters(cmd, "tablename", factTableName);
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//        appendExecCmdParameters(cmd, "output", output);
-//        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step");
-//
-//        result.setMapReduceParams(cmd.toString());
-//        return result;
-//    }
-//
-//    private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) {
-//        // base cuboid job
-//        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-//        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-//        StringBuilder cmd = new StringBuilder();
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//        appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-//
-//        buildDictionaryStep.setJobParams(cmd.toString());
-//        buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
-//        return buildDictionaryStep;
-//    }
-//
-//    private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
-//        // base cuboid job
-//        MapReduceExecutable buildIIStep = new MapReduceExecutable();
-//
-//        StringBuilder cmd = new StringBuilder();
-//        appendMapReduceParameters(cmd, engineConfig);
-//
-//        buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
-//
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//        appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
-//        appendExecCmdParameters(cmd, "output", iiOutputTempPath);
-//        appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
-//
-//        buildIIStep.setMapReduceParams(cmd.toString());
-//        buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
-//        return buildIIStep;
-//    }
-//
-//    private HadoopShellExecutable createCreateHTableStep(IISegment seg) {
-//        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
-//        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
-//        StringBuilder cmd = new StringBuilder();
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-//
-//        createHtableStep.setJobParams(cmd.toString());
-//        createHtableStep.setJobClass(IICreateHTableJob.class);
-//
-//        return createHtableStep;
-//    }
-//
-//    private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) {
-//        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
-//        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE);
-//        StringBuilder cmd = new StringBuilder();
-//
-//        appendMapReduceParameters(cmd, engineConfig);
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//        appendExecCmdParameters(cmd, "input", inputPath);
-//        appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
-//        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-//        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step");
-//
-//        createHFilesStep.setMapReduceParams(cmd.toString());
-//        createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
-//
-//        return createHFilesStep;
-//    }
-//
-//    private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) {
-//        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
-//        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-//
-//        StringBuilder cmd = new StringBuilder();
-//        appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
-//        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//
-//        bulkLoadStep.setJobParams(cmd.toString());
-//        bulkLoadStep.setJobClass(IIBulkLoadJob.class);
-//
-//        return bulkLoadStep;
-//
-//    }
-//
-//    private StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
-//        return buf.append(" -").append(paraName).append(" ").append(paraValue);
-//    }
-//
-//    private String getJobWorkingDir(String uuid) {
-//        return engineConfig.getHdfsWorkingDirectory() + "kylin-" + uuid;
-//    }
-//
-//    private String getIntermediateTableIdentity(IIJoinedFlatTableDesc intermediateTableDesc) {
-//        return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName();
-//    }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
index bcae524..5191aca 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -44,7 +45,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author yangli9
  */
 public class InvertedIndexJob extends AbstractHadoopJob {
     protected static final Logger logger = LoggerFactory.getLogger(InvertedIndexJob.class);
@@ -56,13 +56,11 @@ public class InvertedIndexJob extends AbstractHadoopJob {
         try {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_II_NAME);
-            options.addOption(OPTION_TABLE_NAME);
             options.addOption(OPTION_OUTPUT_PATH);
             parseOptions(options, args);
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             String iiname = getOptionValue(OPTION_II_NAME);
-            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
 
             // ----------------------------------------------------------------------------
@@ -111,17 +109,11 @@ public class InvertedIndexJob extends AbstractHadoopJob {
    
     private void setupMapper(IISegment segment) throws IOException {
 
-//        String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
-//        HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-//
-//        job.setInputFormatClass(HCatInputFormat.class);
         IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
         flatTableInputFormat.configureJob(job);
 
-
         job.setMapperClass(InvertedIndexMapper.class);
         job.setMapOutputKeyClass(LongWritable.class);
-        job.setMapOutputValueClass(ImmutableBytesWritable.class);
         job.setPartitionerClass(InvertedIndexPartitioner.class);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
index 88249ed..a1251a3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
@@ -18,13 +18,11 @@
 
 package org.apache.kylin.engine.mr.invertedindex;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
@@ -33,20 +31,19 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 
+import java.io.IOException;
+
 /**
  * @author yangli9
  */
-public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongWritable, ImmutableBytesWritable> {
+public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongWritable, Writable> {
 
     private TableRecordInfo info;
-    private TableRecord rec;
 
     private LongWritable outputKey;
-    private ImmutableBytesWritable outputValue;
     private IMRInput.IMRTableInputFormat flatTableInputFormat;
 
     @Override
@@ -60,10 +57,8 @@ public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongW
         IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
         IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
         this.info = new TableRecordInfo(seg);
-        this.rec = this.info.createTableRecord();
 
         outputKey = new LongWritable();
-        outputValue = new ImmutableBytesWritable(rec.getBytes());
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(ii.getFirstSegment()).getFlatTableInputFormat();
     }
@@ -71,17 +66,12 @@ public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongW
     @Override
     public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
 
-        String[] row = flatTableInputFormat.parseMapperInput(record);
-        rec.reset();
-        for (int i = 0; i < row.length; i++) {
-            Object fieldValue = row[i];
-            if (fieldValue != null)
-                rec.setValueString(i, fieldValue.toString());
-        }
-
-        outputKey.set(rec.getTimestamp());
-        // outputValue's backing bytes array is the same as rec
+        Writable writableRecord = (Writable) record;
+        String[] row = flatTableInputFormat.parseMapperInput(writableRecord);
+        String timestampString = row[info.getTimestampColumn()];
 
-        context.write(outputKey, outputValue);
+        outputKey.set(DateFormat.stringToMillis(timestampString));
+        //
+        context.write(outputKey, writableRecord);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
index 7644456..56c0b9e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
@@ -18,35 +18,45 @@
 
 package org.apache.kylin.engine.mr.invertedindex;
 
-import java.io.IOException;
-
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.IncrementalSliceMaker;
 import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.SliceBuilder;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 
 /**
- * @author yangli9
  */
-public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
+public class InvertedIndexReducer extends KylinReducer<LongWritable, Object, ImmutableBytesWritable, ImmutableBytesWritable> {
 
     private TableRecordInfo info;
-    private TableRecord rec;
-    private IncrementalSliceMaker builder;
     private IIKeyValueCodec kv;
+    private IMRInput.IMRTableInputFormat flatTableInputFormat;
+    private SliceBuilder sliceBuilder;
+    private ArrayList<StreamingMessage> messages;
+    private int sliceSize;
+    private ImmutableBytesWritable immutableBytesWritable;
+    private ByteBuffer valueBuf;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -56,44 +66,61 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBy
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         IIManager mgr = IIManager.getInstance(config);
         IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
-        IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+        IISegment seg = ii.getFirstSegment();
         info = new TableRecordInfo(seg);
-        rec = info.createTableRecord();
-        builder = null;
         kv = new IIKeyValueCodec(info.getDigest());
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(ii.getFirstSegment()).getFlatTableInputFormat();
+        sliceSize = ii.getDescriptor().getSliceSize();
+        short shard = (short) context.getTaskAttemptID().getTaskID().getId();
+        System.out.println("Generating to shard - " + shard);
+        sliceBuilder = new SliceBuilder(seg.getIIDesc(), shard, true);
+        messages = Lists.newArrayListWithCapacity(sliceSize);
+        immutableBytesWritable = new ImmutableBytesWritable();
+        valueBuf = ByteBuffer.allocate(1024 * 1024); // 1MB
     }
 
     @Override
-    public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
+    public void reduce(LongWritable key, Iterable<Object> values, Context context) //
             throws IOException, InterruptedException {
-        for (ImmutableBytesWritable v : values) {
-            rec.setBytes(v.get(), v.getOffset(), v.getLength());
-
-            if (builder == null) {
-                builder = new IncrementalSliceMaker(info, rec.getShard());
-            }
-
-            //TODO: to delete this log
-            System.out.println(rec.getShard() + " - " + rec);
-
-            Slice slice = builder.append(rec);
-            if (slice != null) {
-                output(slice, context);
+        for (Object v : values) {
+            String[] row = flatTableInputFormat.parseMapperInput(v);
+            messages.add((parse(row)));
+            if (messages.size() >= sliceSize) {
+                buildAndOutput(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), context);
+                messages = Lists.newArrayList();
             }
         }
     }
 
+    private StreamingMessage parse(String[] row) {
+        return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object> emptyMap());
+    }
+
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-        Slice slice = builder.close();
-        if (slice != null) {
-            output(slice, context);
+        if (!messages.isEmpty()) {
+            buildAndOutput(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), context);
+            messages.clear();
         }
+
     }
 
-    private void output(Slice slice, Context context) throws IOException, InterruptedException {
+    private void buildAndOutput(StreamingBatch streamingBatch, Context context) throws IOException, InterruptedException {
+        final Slice slice = sliceBuilder.buildSlice(streamingBatch);
+        ImmutableBytesWritable value, dictionary;
         for (IIRow pair : kv.encodeKeyValue(slice)) {
-            context.write(pair.getKey(), pair.getValue());
+            value = pair.getValue();
+            dictionary = pair.getDictionary();
+            int newLength = 4 + value.getLength() + dictionary.getLength();
+            if (newLength > valueBuf.limit()) {
+                valueBuf = ByteBuffer.allocate(newLength);
+            }
+            valueBuf.clear();
+            valueBuf.putInt(value.getLength());
+            valueBuf.put(value.get(), value.getOffset(), value.getLength());
+            valueBuf.put(dictionary.get(), dictionary.getOffset(), dictionary.getLength());
+            immutableBytesWritable.set(valueBuf.array(), 0, newLength);
+            context.write(pair.getKey(), immutableBytesWritable);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java
new file mode 100644
index 0000000..ef9b1c3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+import java.io.IOException;
+
+/**
+ */
+public class UpdateIIInfoAfterBuildStep extends AbstractExecutable {
+
+    private static final String II_NAME = "iiName";
+    private static final String JOB_ID = "jobId";
+
+    public UpdateIIInfoAfterBuildStep() {
+        super();
+    }
+
+    public void setInvertedIndexName(String cubeName) {
+        this.setParam(II_NAME, cubeName);
+    }
+
+    private String getInvertedIndexName() {
+        return getParam(II_NAME);
+    }
+
+    public void setJobId(String id) {
+        setParam(JOB_ID, id);
+    }
+
+    private String getJobId() {
+        return getParam(JOB_ID);
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+        IIInstance ii = mgr.getII(getInvertedIndexName());
+        IISegment segment = ii.getFirstSegment();
+        segment.setStatus(SegmentStatusEnum.READY);
+        
+        segment.setLastBuildJobID(getJobId());
+        segment.setLastBuildTime(System.currentTimeMillis());
+
+        try {
+            mgr.updateII(ii);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (IOException e) {
+            logger.error("fail to update inverted index after build", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
deleted file mode 100644
index 277dea5..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-import java.io.IOException;
-
-/**
- */
-public class UpdateInvertedIndexInfoAfterBuildStep extends AbstractExecutable {
-
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String II_NAME = "iiName";
-    private static final String JOB_ID = "jobId";
-
-    public UpdateInvertedIndexInfoAfterBuildStep() {
-        super();
-    }
-
-    public void setInvertedIndexName(String cubeName) {
-        this.setParam(II_NAME, cubeName);
-    }
-
-    private String getInvertedIndexName() {
-        return getParam(II_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-    
-    public void setJobId(String id) {
-        setParam(JOB_ID, id);
-    }
-
-    private String getJobId() {
-        return getParam(JOB_ID);
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-
-        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-        IIInstance ii = mgr.getII(getInvertedIndexName());
-        IISegment segment = ii.getFirstSegment();
-        segment.setStatus(SegmentStatusEnum.READY);
-        
-        segment.setLastBuildJobID(getJobId());
-        segment.setLastBuildTime(System.currentTimeMillis());
-
-        try {
-            mgr.updateII(ii);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to update inverted index after build", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
index d8cb24b..1cf3d98 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
@@ -33,6 +33,8 @@
  */
 package org.apache.kylin.engine.streaming;
 
+import org.apache.kylin.common.util.StreamingBatch;
+
 /**
  */
 public interface IStreamingInput {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
index 475e43a..4a3b8b8 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
@@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming;
 import java.util.Map;
 
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.StreamingBatch;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.engine.streaming.util.StreamingUtils;
 import org.apache.kylin.metadata.model.IBuildable;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java
deleted file mode 100644
index c7c7d29..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-package org.apache.kylin.engine.streaming;
-
-import java.util.List;
-
-import org.apache.kylin.common.util.Pair;
-
-/**
- */
-public final class StreamingBatch {
-
-    private final List<StreamingMessage> messages;
-
-    private final Pair<Long, Long> timeRange;
-
-    public StreamingBatch(List<StreamingMessage> messages, Pair<Long, Long> timeRange) {
-        this.messages = messages;
-        this.timeRange = timeRange;
-    }
-
-    public List<StreamingMessage> getMessages() {
-        return messages;
-    }
-
-    public Pair<Long, Long> getTimeRange() {
-        return timeRange;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
index d55ccdb..93cda2d 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
@@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming;
 import java.util.Map;
 
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.StreamingBatch;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.metadata.model.IBuildable;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java
deleted file mode 100644
index 7885902..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.kylin.engine.streaming;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class StreamingMessage {
-
-    private final List<String> data;
-
-    private long offset;
-
-    private long timestamp;
-
-    private Map<String, Object> params;
-
-    public static final StreamingMessage EOF = new StreamingMessage(Collections.<String> emptyList(), 0L, 0L, Collections.<String, Object> emptyMap());
-
-    public StreamingMessage(List<String> data, long offset, long timestamp, Map<String, Object> params) {
-        this.data = data;
-        this.offset = offset;
-        this.timestamp = timestamp;
-        this.params = params;
-    }
-
-    public final List<String> getData() {
-        return data;
-    }
-
-    public final long getOffset() {
-        return offset;
-    }
-
-    public final long getTimestamp() {
-        return timestamp;
-    }
-
-    public Map<String, Object> getParams() {
-        return params;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index 64a3061..ae72218 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -54,9 +54,9 @@ import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.cube.util.CubingUtils;
 import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.common.util.StreamingBatch;
 import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
deleted file mode 100644
index fa5a0b2..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.invertedindex;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
-import org.apache.kylin.invertedindex.index.BatchSliceMaker;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
-/**
- */
-public final class SliceBuilder {
-
-    private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
-
-    private final BatchSliceMaker sliceMaker;
-    private final IIDesc iiDesc;
-    private final boolean useLocalDict;
-
-    public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
-        this.iiDesc = desc;
-        this.sliceMaker = new BatchSliceMaker(desc, shard);
-        this.useLocalDict = useLocalDict;
-    }
-
-    public Slice buildSlice(StreamingBatch microStreamBatch) {
-        final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
-            @Nullable
-            @Override
-            public List<String> apply(@Nullable StreamingMessage input) {
-                return input.getData();
-            }
-        });
-        final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
-        TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
-        return build(messages, tableRecordInfo, dictionaries);
-    }
-
-    private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
-        final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
-            @Nullable
-            @Override
-            public TableRecord apply(@Nullable List<String> input) {
-                TableRecord result = tableRecordInfo.createTableRecord();
-                for (int i = 0; i < input.size(); i++) {
-                    result.setValueString(i, input.get(i));
-                }
-                return result;
-            }
-        }));
-        slice.setLocalDictionaries(localDictionary);
-        return slice;
-    }
-}