You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/06 13:05:43 UTC

[1/3] incubator-kylin git commit: KYLIN-1116 use local dictionary for inverted index building

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 8a77b8600 -> b3fefd140


KYLIN-1116 use local dictionary for inverted index building


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b3fefd14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b3fefd14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b3fefd14

Branch: refs/heads/2.x-staging
Commit: b3fefd1406b81c87e3cdede96c010fcade7ef1a0
Parents: d5d8a40
Author: shaofengshi <sh...@apache.org>
Authored: Fri Nov 6 20:02:30 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 6 20:04:02 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/BuildIIWithStreamTest.java  |  2 +-
 .../org/apache/kylin/job/hadoop/invertedindex/IITest.java |  2 +-
 .../engine/mr/invertedindex/InvertedIndexReducer.java     |  2 +-
 .../apache/kylin/invertedindex/index/SliceBuilder.java    |  6 ++----
 .../java/org/apache/kylin/invertedindex/model/IIDesc.java | 10 ----------
 5 files changed, 5 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b3fefd14/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 2d40b09..ddfd399 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -190,7 +190,7 @@ public class BuildIIWithStreamTest {
         ToolRunner.run(new IICreateHTableJob(), args);
 
         final IIDesc iiDesc = segment.getIIDesc();
-        final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0, iiDesc.isUseLocalDictionary());
+        final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0);
 
         List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
         int count = sorted.size();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b3fefd14/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index a393ce3..ac91efb 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -102,7 +102,7 @@ public class IITest extends LocalFileMetadataTestCase {
         StreamingBatch batch = new StreamingBatch(streamingMessages, Pair.newPair(0L, System.currentTimeMillis()));
 
         iiRows = Lists.newArrayList();
-        final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
+        final Slice slice = new SliceBuilder(iiDesc, (short) 0).buildSlice((batch));
         IIKeyValueCodec codec = new IIKeyValueCodec(slice.getInfo());
         for (IIRow iiRow : codec.encodeKeyValue(slice)) {
             iiRows.add(iiRow);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b3fefd14/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
index 56c0b9e..c223159 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
@@ -73,7 +73,7 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable, Object, Imm
         sliceSize = ii.getDescriptor().getSliceSize();
         short shard = (short) context.getTaskAttemptID().getTaskID().getId();
         System.out.println("Generating to shard - " + shard);
-        sliceBuilder = new SliceBuilder(seg.getIIDesc(), shard, true);
+        sliceBuilder = new SliceBuilder(seg.getIIDesc(), shard);
         messages = Lists.newArrayListWithCapacity(sliceSize);
         immutableBytesWritable = new ImmutableBytesWritable();
         valueBuf = ByteBuffer.allocate(1024 * 1024); // 1MB

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b3fefd14/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
index 1c293d7..ef63b0f 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
@@ -38,12 +38,10 @@ public final class SliceBuilder {
 
     private final BatchSliceMaker sliceMaker;
     private final IIDesc iiDesc;
-    private final boolean useLocalDict;
 
-    public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
+    public SliceBuilder(IIDesc desc, short shard) {
         this.iiDesc = desc;
         this.sliceMaker = new BatchSliceMaker(desc, shard);
-        this.useLocalDict = useLocalDict;
     }
 
     public Slice buildSlice(StreamingBatch microStreamBatch) {
@@ -54,7 +52,7 @@ public final class SliceBuilder {
                 return input.getData();
             }
         });
-        final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+        final Dictionary<?>[] dictionaries = IIDictionaryBuilder.buildDictionary(messages, iiDesc);
         TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
         return build(messages, tableRecordInfo, dictionaries);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b3fefd14/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index bfa4eaa..71737dc 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -78,8 +78,6 @@ public class IIDesc extends RootPersistentEntity {
     private short sharding = 1; // parallelism
     @JsonProperty("slice_size")
     private int sliceSize = 50000; // no. rows
-    @JsonProperty("useLocalDictionary")
-    private boolean useLocalDictionary = true;
 
     @JsonProperty("engine_type")
     private int engineType = IEngineAware.ID_MR_II;
@@ -374,14 +372,6 @@ public class IIDesc extends RootPersistentEntity {
         this.name = name;
     }
 
-    public boolean isUseLocalDictionary() {
-        return useLocalDictionary;
-    }
-
-    public void setUseLocalDictionary(boolean useLocalDictionary) {
-        this.useLocalDictionary = useLocalDictionary;
-    }
-
     public String calculateSignature() {
         MessageDigest md = null;
         try {


[3/3] incubator-kylin git commit: KYLIN-1116 Use local dictionary for InvertedIndex batch building

Posted by sh...@apache.org.
KYLIN-1116 Use local dictionary for InvertedIndex batch building


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d5d8a403
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d5d8a403
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d5d8a403

Branch: refs/heads/2.x-staging
Commit: d5d8a403830d3a3be35662cc19d2dc263a7b7fbd
Parents: 8a77b86
Author: shaofengshi <sh...@apache.org>
Authored: Fri Nov 6 16:24:29 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Nov 6 20:04:02 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/BuildIIWithStreamTest.java |  15 +-
 .../kylin/job/hadoop/invertedindex/IITest.java  |  11 +-
 .../kylin/common/util/StreamingBatch.java       |  59 +++++
 .../kylin/common/util/StreamingMessage.java     |  43 ++++
 .../realization/IRealizationSegment.java        |   1 -
 .../kylin/engine/mr/JobBuilderSupport.java      |   8 +-
 .../engine/mr/common/AbstractHadoopJob.java     |   3 -
 .../mr/invertedindex/BatchIIJobBuilder.java     |  61 +-----
 .../CreateInvertedIndexDictionaryJob.java       |  70 ------
 .../IIDistinctColumnsCombiner.java              |  58 -----
 .../mr/invertedindex/IIDistinctColumnsJob.java  | 138 ------------
 .../invertedindex/IIDistinctColumnsMapper.java  |  75 -------
 .../invertedindex/IIDistinctColumnsReducer.java |  77 -------
 .../engine/mr/invertedindex/IIJobBuilder.java   | 219 -------------------
 .../mr/invertedindex/InvertedIndexJob.java      |  10 +-
 .../mr/invertedindex/InvertedIndexMapper.java   |  32 +--
 .../mr/invertedindex/InvertedIndexReducer.java  |  89 +++++---
 .../UpdateIIInfoAfterBuildStep.java             |  84 +++++++
 .../UpdateInvertedIndexInfoAfterBuildStep.java  |  93 --------
 .../kylin/engine/streaming/IStreamingInput.java |   2 +
 .../streaming/OneOffStreamingBuilder.java       |   1 +
 .../kylin/engine/streaming/StreamingBatch.java  |  61 ------
 .../engine/streaming/StreamingBatchBuilder.java |   1 +
 .../engine/streaming/StreamingMessage.java      |  43 ----
 .../streaming/cube/StreamingCubeBuilder.java    |   4 +-
 .../streaming/invertedindex/SliceBuilder.java   |  81 -------
 .../invertedindex/test_kylin_ii_inner_join.json |  45 +---
 .../invertedindex/test_kylin_ii_left_join.json  |  45 +---
 .../invertedindex/test_streaming_table_ii.json  |  22 +-
 .../apache/kylin/invertedindex/IIManager.java   |  50 +----
 .../apache/kylin/invertedindex/IISegment.java   |  51 +----
 .../kylin/invertedindex/index/SliceBuilder.java |  77 +++++++
 .../invertedindex/IIInstanceTest.java           |   5 -
 .../apache/kylin/source/hive/HiveMRInput.java   |   3 +
 .../kylin/source/kafka/KafkaStreamingInput.java |   4 +-
 .../kylin/source/kafka/StreamingParser.java     |   2 +-
 .../source/kafka/StringStreamingParser.java     |   2 +-
 .../source/kafka/TimedJsonStreamParser.java     |   2 +-
 .../kafka/diagnose/KafkaInputAnalyzer.java      |   2 +-
 .../kylin/source/kafka/util/KafkaUtils.java     |   2 +-
 .../storage/hbase/ii/IICreateHFileMapper.java   |  19 +-
 .../storage/hbase/ii/IICreateHTableJob.java     |  31 ++-
 .../storage/hbase/steps/HBaseMROutput.java      |   2 +-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  18 +-
 .../hbase/util/DeployCoprocessorCLI.java        |   3 +-
 .../storage/hbase/util/StorageCleanupJob.java   |   2 -
 .../hbase/common/TsConditionEraserTest.java     |   6 +-
 .../endpoint/EndpointAggregationTest.java       |  27 ++-
 .../endpoint/TableRecordInfoTest.java           |   5 +
 .../endpoint/TsConditionExtractorTest.java      |   5 +
 50 files changed, 469 insertions(+), 1300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index b64a7c5..2d40b09 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -51,8 +51,8 @@ import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
@@ -61,7 +61,7 @@ import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder;
+import org.apache.kylin.invertedindex.index.SliceBuilder;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -196,14 +196,13 @@ public class BuildIIWithStreamTest {
         int count = sorted.size();
         ArrayList<StreamingMessage> messages = Lists.newArrayList();
         for (String[] row : sorted) {
-            if (messages.size() < iiDesc.getSliceSize()) {
-                messages.add(parse(row));
-            } else {
+            messages.add((parse(row)));
+            if (messages.size() >= iiDesc.getSliceSize()) {
                 build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
-                messages = Lists.newArrayList();
-                messages.add((parse(row)));
+                messages.clear();
             }
         }
+        
         if (!messages.isEmpty()) {
             build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 200156a..a393ce3 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -23,10 +23,11 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.util.FIFOIterable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
@@ -35,7 +36,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
 import org.apache.kylin.invertedindex.model.IIRow;
 import org.apache.kylin.invertedindex.model.KeyValueCodec;
-import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder;
+import org.apache.kylin.invertedindex.index.SliceBuilder;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -146,6 +147,10 @@ public class IITest extends LocalFileMetadataTestCase {
     @Test
     public void IIEndpointTest() {
         TableRecordInfo info = new TableRecordInfo(ii.getDescriptor());
+        if (ii.getFirstSegment() == null) {
+            IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+            ii.getSegments().add(segment);
+        }
         CoprocessorRowType type = CoprocessorRowType.fromTableRecordInfo(info, ii.getFirstSegment().getColumns());
         CoprocessorProjector projector = CoprocessorProjector.makeForEndpoint(info, Collections.singletonList(ii.getDescriptor().findColumnRef("default.test_kylin_fact", "lstg_format_name")));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
new file mode 100644
index 0000000..ae5b72f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java
@@ -0,0 +1,59 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+package org.apache.kylin.common.util;
+
+import java.util.List;
+
+/**
+ */
+public final class StreamingBatch {
+
+    private final List<StreamingMessage> messages;
+
+    private final Pair<Long, Long> timeRange;
+
+    public StreamingBatch(List<StreamingMessage> messages, Pair<Long, Long> timeRange) {
+        this.messages = messages;
+        this.timeRange = timeRange;
+    }
+
+    public List<StreamingMessage> getMessages() {
+        return messages;
+    }
+
+    public Pair<Long, Long> getTimeRange() {
+        return timeRange;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
new file mode 100644
index 0000000..2c150ff
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
@@ -0,0 +1,43 @@
+package org.apache.kylin.common.util;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class StreamingMessage {
+
+    private final List<String> data;
+
+    private long offset;
+
+    private long timestamp;
+
+    private Map<String, Object> params;
+
+    public static final StreamingMessage EOF = new StreamingMessage(Collections.<String> emptyList(), 0L, 0L, Collections.<String, Object> emptyMap());
+
+    public StreamingMessage(List<String> data, long offset, long timestamp, Map<String, Object> params) {
+        this.data = data;
+        this.offset = offset;
+        this.timestamp = timestamp;
+        this.params = params;
+    }
+
+    public final List<String> getData() {
+        return data;
+    }
+
+    public final long getOffset() {
+        return offset;
+    }
+
+    public final long getTimestamp() {
+        return timestamp;
+    }
+
+    public Map<String, Object> getParams() {
+        return params;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
index afab86b..3ac82a0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
@@ -4,7 +4,6 @@ import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 
 /**
- * Created by shaoshi on 10/30/15.
  */
 public interface IRealizationSegment extends IBuildable {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 8c770f9..e3b07d8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -25,13 +25,12 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.invertedindex.UpdateInvertedIndexInfoAfterBuildStep;
+import org.apache.kylin.engine.mr.invertedindex.UpdateIIInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
 import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
 import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
-import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 
@@ -125,11 +124,10 @@ public class JobBuilderSupport {
 
 
 
-    public UpdateInvertedIndexInfoAfterBuildStep createUpdateInvertedIndexInfoAfterBuildStep(String jobId) {
-        final UpdateInvertedIndexInfoAfterBuildStep updateIIInfoStep = new UpdateInvertedIndexInfoAfterBuildStep();
+    public UpdateIIInfoAfterBuildStep createUpdateIIInfoAfterBuildStep(String jobId) {
+        final UpdateIIInfoAfterBuildStep updateIIInfoStep = new UpdateIIInfoAfterBuildStep();
         updateIIInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_II_INFO);
         updateIIInfoStep.setInvertedIndexName(seg.getRealization().getName());
-        updateIIInfoStep.setSegmentId(seg.getUuid());
         updateIIInfoStep.setJobId(jobId);
         return updateIIInfoStep;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 8782bbe..366a730 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -337,9 +337,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             TableDesc table = metaMgr.getTableDesc(tableName);
             dumpList.add(table.getResourcePath());
         }
-        for (IISegment segment : ii.getSegments()) {
-            dumpList.addAll(segment.getDictionaryPaths());
-        }
 
         attachKylinPropsAndMetadata(dumpList, conf);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
index 97e27d0..e7501b8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
@@ -18,30 +18,16 @@
 
 package org.apache.kylin.engine.mr.invertedindex;
 
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput;
-import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
-import org.apache.kylin.engine.mr.steps.NDCuboidJob;
 import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.IRealizationSegment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 public class BatchIIJobBuilder extends JobBuilderSupport {
     
     private static final Logger logger = LoggerFactory.getLogger(BatchIIJobBuilder.class);
@@ -64,54 +50,20 @@ public class BatchIIJobBuilder extends JobBuilderSupport {
         final String iiRootPath = getRealizationRootPath(jobId) + "/";
         // Phase 1: Create Flat Table
         inputSide.addStepPhase1_CreateFlatTable(result);
-
-        final String intermediateTableIdentity = seg.getJoinedFlatTableDesc().getTableName();
-        // Phase 2: Build Dictionary
-        result.addTask(createIIFactDistinctColumnsStep(seg, intermediateTableIdentity, getFactDistinctColumnsPath(jobId)));
-        result.addTask(createIIBuildDictionaryStep(seg, getFactDistinctColumnsPath(jobId)));
-
-        // Phase 3: Build Cube
-        result.addTask(createInvertedIndexStep((IISegment)seg, intermediateTableIdentity, iiRootPath));
+    
+        // Phase 2: Build Inverted Index
+        result.addTask(createInvertedIndexStep((IISegment)seg, iiRootPath));
         outputSide.addStepPhase3_BuildII(result, iiRootPath);
 
-        // Phase 4: Update Metadata & Cleanup
-        result.addTask(createUpdateInvertedIndexInfoAfterBuildStep(jobId));
+        // Phase 3: Update Metadata & Cleanup
+        result.addTask(createUpdateIIInfoAfterBuildStep(jobId));
         inputSide.addStepPhase4_Cleanup(result);
         outputSide.addStepPhase4_Cleanup(result);
 
         return result;
     }
 
-    private MapReduceExecutable createIIFactDistinctColumnsStep(IRealizationSegment seg, String factTableName, String output) {
-        MapReduceExecutable result = new MapReduceExecutable();
-        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        result.setMapReduceJobClass(IIDistinctColumnsJob.class);
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
-        appendExecCmdParameters(cmd, "tablename", factTableName);
-        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "output", output);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
-
-        result.setMapReduceParams(cmd.toString());
-        return result;
-    }
-
-    private HadoopShellExecutable createIIBuildDictionaryStep(IRealizationSegment seg, String factDistinctColumnsPath) {
-        // base cuboid job
-        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-
-        buildDictionaryStep.setJobParams(cmd.toString());
-        buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
-        return buildDictionaryStep;
-    }
-
-    private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
-        // base cuboid job
+    private MapReduceExecutable createInvertedIndexStep(IISegment seg, String iiOutputTempPath) {
         MapReduceExecutable buildIIStep = new MapReduceExecutable();
 
         StringBuilder cmd = new StringBuilder();
@@ -120,7 +72,6 @@ public class BatchIIJobBuilder extends JobBuilderSupport {
         buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
 
         appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
         appendExecCmdParameters(cmd, "output", iiOutputTempPath);
         appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
deleted file mode 100644
index 39d74b4..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.engine.mr.DFSFileTable;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
-
-/**
- */
-public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob {
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_II_NAME);
-            options.addOption(OPTION_INPUT_PATH);
-            parseOptions(options, args);
-
-            final String iiname = getOptionValue(OPTION_II_NAME);
-            final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
-            final KylinConfig config = KylinConfig.getInstanceFromEnv();
-
-            IIManager mgr = IIManager.getInstance(config);
-            IIInstance ii = mgr.getII(iiname);
-
-            mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), new DistinctColumnValuesProvider() {
-                @Override
-                public ReadableTable getDistinctValuesFor(TblColRef col) {
-                    return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
-                }
-            });
-            return 0;
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
deleted file mode 100644
index 651ad63..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.KylinReducer;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> {
-
-    private Text outputValue = new Text();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-    }
-
-    @Override
-    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
-        HashSet<ByteArray> set = new HashSet<ByteArray>();
-        for (Text textValue : values) {
-            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
-            set.add(value);
-        }
-
-        for (ByteArray value : set) {
-            outputValue.set(value.array(), value.offset(), value.length());
-            context.write(key, outputValue);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
deleted file mode 100644
index fe968b1..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsJob extends AbstractHadoopJob {
-    protected static final Logger logger = LoggerFactory.getLogger(IIDistinctColumnsJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_TABLE_NAME);
-            options.addOption(OPTION_II_NAME);
-            options.addOption(OPTION_OUTPUT_PATH);
-            parseOptions(options, args);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase();
-            String iiName = getOptionValue(OPTION_II_NAME);
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
-            // ----------------------------------------------------------------------------
-
-            logger.info("Starting: " + job.getJobName() + " on table " + tableName);
-
-            IIManager iiMgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-            IIInstance ii = iiMgr.getII(iiName);
-            job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
-            job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii));
-
-            setJobClasspath(job);
-
-            
-            setupMapper(ii.getFirstSegment());
-            setupReducer(output);
-
-            Configuration conf = job.getConfiguration();
-            conf.set(BatchConstants.CFG_II_NAME, ii.getName());
-            attachKylinPropsAndMetadata(ii, conf);
-            return waitForCompletion(job);
-
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-
-    }
-
-    private String getColumns(IIInstance ii) {
-        IIJoinedFlatTableDesc iiflat = new IIJoinedFlatTableDesc(ii.getDescriptor());
-        StringBuilder buf = new StringBuilder();
-        for (IntermediateColumnDesc col : iiflat.getColumnList()) {
-            if (buf.length() > 0)
-                buf.append(",");
-            buf.append(col.getColumnName());
-        }
-        return buf.toString();
-    }
-
-    private void setupMapper(IISegment segment) throws IOException {
-        
-        IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
-        flatTableInputFormat.configureJob(job);
-
-        job.setMapperClass(IIDistinctColumnsMapper.class);
-        job.setCombinerClass(IIDistinctColumnsCombiner.class);
-        job.setMapOutputKeyClass(ShortWritable.class);
-        job.setMapOutputValueClass(Text.class);
-    }
-
-    private void setupReducer(Path output) throws IOException {
-        job.setReducerClass(IIDistinctColumnsReducer.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setOutputKeyClass(NullWritable.class);
-        job.setOutputValueClass(Text.class);
-
-        FileOutputFormat.setOutputPath(job, output);
-        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
-        job.setNumReduceTasks(1);
-
-        deletePath(job.getConfiguration(), output);
-    }
-
-    public static void main(String[] args) throws Exception {
-        IIDistinctColumnsJob job = new IIDistinctColumnsJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
deleted file mode 100644
index c431ecd..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, Object, ShortWritable, Text> {
-
-    private ShortWritable outputKey = new ShortWritable();
-    private Text outputValue = new Text();
-
-    protected IMRInput.IMRTableInputFormat flatTableInputFormat;
-    
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-        Configuration conf = context.getConfiguration();
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        String iiName = conf.get(BatchConstants.CFG_II_NAME);
-        IIInstance iiInstance = IIManager.getInstance(config).getII(iiName);
-        IISegment seg = iiInstance.getFirstSegment();
-        flatTableInputFormat = MRUtil.getBatchCubingInputSide(seg).getFlatTableInputFormat();
-    }
-
-    @Override
-    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
-
-        String[] row = flatTableInputFormat.parseMapperInput(record);
-        
-        for (short i = 0; i < row.length; i++) {
-            outputKey.set(i);
-            if (row[i] == null)
-                continue;
-            byte[] bytes = Bytes.toBytes(row[i].toString());
-            outputValue.set(bytes, 0, bytes.length);
-            context.write(outputKey, outputValue);
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
deleted file mode 100644
index d50385f..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
-
-    private String[] columns;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-        this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
-    }
-
-    @Override
-    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-        String columnName = columns[key.get()];
-
-        HashSet<ByteArray> set = new HashSet<ByteArray>();
-        for (Text textValue : values) {
-            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
-            set.add(value);
-        }
-
-        Configuration conf = context.getConfiguration();
-        FileSystem fs = FileSystem.get(conf);
-        String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
-        FSDataOutputStream out = fs.create(new Path(outputPath, columnName));
-
-        try {
-            for (ByteArray value : set) {
-                out.write(value.array(), value.offset(), value.length());
-                out.write('\n');
-            }
-        } finally {
-            out.close();
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
deleted file mode 100644
index 18d3001..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java
+++ /dev/null
@@ -1,219 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// * 
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// * 
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-//*/
-//
-//package org.apache.kylin.engine.mr.invertedindex;
-//
-//import java.io.IOException;
-//import java.text.SimpleDateFormat;
-//import java.util.Date;
-//import java.util.TimeZone;
-//
-//import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
-//import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-//import org.apache.kylin.invertedindex.IISegment;
-//import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-//import org.apache.kylin.job.constant.ExecutableConstants;
-//import org.apache.kylin.job.engine.JobEngineConfig;
-//import org.apache.kylin.job.execution.AbstractExecutable;
-//import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-//
-//import com.google.common.base.Preconditions;
-//
-///**
-// */
-//public final class IIJobBuilder {
-//
-//    final JobEngineConfig engineConfig;
-//
-//    public IIJobBuilder(JobEngineConfig engineConfig) {
-//        this.engineConfig = engineConfig;
-//    }
-//
-//    public IIJob buildJob(IISegment seg, String submitter) {
-//        checkPreconditions(seg);
-//
-//        IIJob result = initialJob(seg, "BUILD", submitter);
-//        final String jobId = result.getId();
-//        final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
-//        final String intermediateTableIdentity = getIntermediateTableIdentity(intermediateTableDesc);
-//        final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
-//        final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
-//        final String iiPath = iiRootPath + "*";
-//
-//        final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId);
-//        result.addTask(intermediateHiveTableStep);
-//
-//        result.addTask(createFactDistinctColumnsStep(seg, intermediateTableIdentity, jobId, factDistinctColumnsPath));
-//
-//        result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
-//
-//        result.addTask(createInvertedIndexStep(seg, intermediateTableIdentity, iiRootPath));
-//
-//        // create htable step
-//        result.addTask(createCreateHTableStep(seg));
-//
-//        // generate hfiles step
-//        result.addTask(createConvertToHfileStep(seg, iiPath, jobId));
-//
-//        // bulk load step
-//        result.addTask(createBulkLoadStep(seg, jobId));
-//
-//        return result;
-//    }
-//
-//    private IIJob initialJob(IISegment seg, String type, String submitter) {
-//        IIJob result = new IIJob();
-//        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
-//        format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
-//        result.setIIName(seg.getIIInstance().getName());
-//        result.setSegmentId(seg.getUuid());
-//        result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
-//        result.setSubmitter(submitter);
-//        return result;
-//    }
-//
-//    private void checkPreconditions(IISegment seg) {
-//        Preconditions.checkNotNull(seg, "segment cannot be null");
-//        Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
-//    }
-//
-//    private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) {
-//        try {
-//            String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM);
-//            if (jobConf != null && jobConf.length() > 0) {
-//                builder.append(" -conf ").append(jobConf);
-//            }
-//        } catch (IOException e) {
-//            throw new RuntimeException(e);
-//        }
-//    }
-//
-//    private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) {
-//        return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns";
-//    }
-//
-//    private String getHFilePath(IISegment seg, String jobId) {
-//        return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/";
-//    }
-//
-//    private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) {
-//        MapReduceExecutable result = new MapReduceExecutable();
-//        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-//        result.setMapReduceJobClass(IIDistinctColumnsJob.class);
-//        StringBuilder cmd = new StringBuilder();
-//        appendMapReduceParameters(cmd, engineConfig);
-//        appendExecCmdParameters(cmd, "tablename", factTableName);
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//        appendExecCmdParameters(cmd, "output", output);
-//        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step");
-//
-//        result.setMapReduceParams(cmd.toString());
-//        return result;
-//    }
-//
-//    private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) {
-//        // base cuboid job
-//        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-//        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-//        StringBuilder cmd = new StringBuilder();
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//        appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-//
-//        buildDictionaryStep.setJobParams(cmd.toString());
-//        buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
-//        return buildDictionaryStep;
-//    }
-//
-//    private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
-//        // base cuboid job
-//        MapReduceExecutable buildIIStep = new MapReduceExecutable();
-//
-//        StringBuilder cmd = new StringBuilder();
-//        appendMapReduceParameters(cmd, engineConfig);
-//
-//        buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
-//
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//        appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
-//        appendExecCmdParameters(cmd, "output", iiOutputTempPath);
-//        appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
-//
-//        buildIIStep.setMapReduceParams(cmd.toString());
-//        buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
-//        return buildIIStep;
-//    }
-//
-//    private HadoopShellExecutable createCreateHTableStep(IISegment seg) {
-//        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
-//        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
-//        StringBuilder cmd = new StringBuilder();
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-//
-//        createHtableStep.setJobParams(cmd.toString());
-//        createHtableStep.setJobClass(IICreateHTableJob.class);
-//
-//        return createHtableStep;
-//    }
-//
-//    private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) {
-//        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
-//        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE);
-//        StringBuilder cmd = new StringBuilder();
-//
-//        appendMapReduceParameters(cmd, engineConfig);
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//        appendExecCmdParameters(cmd, "input", inputPath);
-//        appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
-//        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-//        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step");
-//
-//        createHFilesStep.setMapReduceParams(cmd.toString());
-//        createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
-//
-//        return createHFilesStep;
-//    }
-//
-//    private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) {
-//        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
-//        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-//
-//        StringBuilder cmd = new StringBuilder();
-//        appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
-//        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-//        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-//
-//        bulkLoadStep.setJobParams(cmd.toString());
-//        bulkLoadStep.setJobClass(IIBulkLoadJob.class);
-//
-//        return bulkLoadStep;
-//
-//    }
-//
-//    private StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
-//        return buf.append(" -").append(paraName).append(" ").append(paraValue);
-//    }
-//
-//    private String getJobWorkingDir(String uuid) {
-//        return engineConfig.getHdfsWorkingDirectory() + "kylin-" + uuid;
-//    }
-//
-//    private String getIntermediateTableIdentity(IIJoinedFlatTableDesc intermediateTableDesc) {
-//        return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName();
-//    }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
index bcae524..5191aca 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -44,7 +45,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author yangli9
  */
 public class InvertedIndexJob extends AbstractHadoopJob {
     protected static final Logger logger = LoggerFactory.getLogger(InvertedIndexJob.class);
@@ -56,13 +56,11 @@ public class InvertedIndexJob extends AbstractHadoopJob {
         try {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_II_NAME);
-            options.addOption(OPTION_TABLE_NAME);
             options.addOption(OPTION_OUTPUT_PATH);
             parseOptions(options, args);
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             String iiname = getOptionValue(OPTION_II_NAME);
-            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
 
             // ----------------------------------------------------------------------------
@@ -111,17 +109,11 @@ public class InvertedIndexJob extends AbstractHadoopJob {
    
     private void setupMapper(IISegment segment) throws IOException {
 
-//        String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
-//        HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-//
-//        job.setInputFormatClass(HCatInputFormat.class);
         IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
         flatTableInputFormat.configureJob(job);
 
-
         job.setMapperClass(InvertedIndexMapper.class);
         job.setMapOutputKeyClass(LongWritable.class);
-        job.setMapOutputValueClass(ImmutableBytesWritable.class);
         job.setPartitionerClass(InvertedIndexPartitioner.class);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
index 88249ed..a1251a3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java
@@ -18,13 +18,11 @@
 
 package org.apache.kylin.engine.mr.invertedindex;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
@@ -33,20 +31,19 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 
+import java.io.IOException;
+
 /**
  * @author yangli9
  */
-public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongWritable, ImmutableBytesWritable> {
+public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongWritable, Writable> {
 
     private TableRecordInfo info;
-    private TableRecord rec;
 
     private LongWritable outputKey;
-    private ImmutableBytesWritable outputValue;
     private IMRInput.IMRTableInputFormat flatTableInputFormat;
 
     @Override
@@ -60,10 +57,8 @@ public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongW
         IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
         IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
         this.info = new TableRecordInfo(seg);
-        this.rec = this.info.createTableRecord();
 
         outputKey = new LongWritable();
-        outputValue = new ImmutableBytesWritable(rec.getBytes());
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(ii.getFirstSegment()).getFlatTableInputFormat();
     }
@@ -71,17 +66,12 @@ public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongW
     @Override
     public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
 
-        String[] row = flatTableInputFormat.parseMapperInput(record);
-        rec.reset();
-        for (int i = 0; i < row.length; i++) {
-            Object fieldValue = row[i];
-            if (fieldValue != null)
-                rec.setValueString(i, fieldValue.toString());
-        }
-
-        outputKey.set(rec.getTimestamp());
-        // outputValue's backing bytes array is the same as rec
+        Writable writableRecord = (Writable) record;
+        String[] row = flatTableInputFormat.parseMapperInput(writableRecord);
+        String timestampString = row[info.getTimestampColumn()];
 
-        context.write(outputKey, outputValue);
+        outputKey.set(DateFormat.stringToMillis(timestampString));
+        //
+        context.write(outputKey, writableRecord);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
index 7644456..56c0b9e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java
@@ -18,35 +18,45 @@
 
 package org.apache.kylin.engine.mr.invertedindex;
 
-import java.io.IOException;
-
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.IncrementalSliceMaker;
 import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.SliceBuilder;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 
 /**
- * @author yangli9
  */
-public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
+public class InvertedIndexReducer extends KylinReducer<LongWritable, Object, ImmutableBytesWritable, ImmutableBytesWritable> {
 
     private TableRecordInfo info;
-    private TableRecord rec;
-    private IncrementalSliceMaker builder;
     private IIKeyValueCodec kv;
+    private IMRInput.IMRTableInputFormat flatTableInputFormat;
+    private SliceBuilder sliceBuilder;
+    private ArrayList<StreamingMessage> messages;
+    private int sliceSize;
+    private ImmutableBytesWritable immutableBytesWritable;
+    private ByteBuffer valueBuf;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -56,44 +66,61 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBy
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         IIManager mgr = IIManager.getInstance(config);
         IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
-        IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+        IISegment seg = ii.getFirstSegment();
         info = new TableRecordInfo(seg);
-        rec = info.createTableRecord();
-        builder = null;
         kv = new IIKeyValueCodec(info.getDigest());
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(ii.getFirstSegment()).getFlatTableInputFormat();
+        sliceSize = ii.getDescriptor().getSliceSize();
+        short shard = (short) context.getTaskAttemptID().getTaskID().getId();
+        System.out.println("Generating to shard - " + shard);
+        sliceBuilder = new SliceBuilder(seg.getIIDesc(), shard, true);
+        messages = Lists.newArrayListWithCapacity(sliceSize);
+        immutableBytesWritable = new ImmutableBytesWritable();
+        valueBuf = ByteBuffer.allocate(1024 * 1024); // 1MB
     }
 
     @Override
-    public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
+    public void reduce(LongWritable key, Iterable<Object> values, Context context) //
             throws IOException, InterruptedException {
-        for (ImmutableBytesWritable v : values) {
-            rec.setBytes(v.get(), v.getOffset(), v.getLength());
-
-            if (builder == null) {
-                builder = new IncrementalSliceMaker(info, rec.getShard());
-            }
-
-            //TODO: to delete this log
-            System.out.println(rec.getShard() + " - " + rec);
-
-            Slice slice = builder.append(rec);
-            if (slice != null) {
-                output(slice, context);
+        for (Object v : values) {
+            String[] row = flatTableInputFormat.parseMapperInput(v);
+            messages.add((parse(row)));
+            if (messages.size() >= sliceSize) {
+                buildAndOutput(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), context);
+                messages = Lists.newArrayList();
             }
         }
     }
 
+    private StreamingMessage parse(String[] row) {
+        return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object> emptyMap());
+    }
+
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-        Slice slice = builder.close();
-        if (slice != null) {
-            output(slice, context);
+        if (!messages.isEmpty()) {
+            buildAndOutput(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), context);
+            messages.clear();
         }
+
     }
 
-    private void output(Slice slice, Context context) throws IOException, InterruptedException {
+    private void buildAndOutput(StreamingBatch streamingBatch, Context context) throws IOException, InterruptedException {
+        final Slice slice = sliceBuilder.buildSlice(streamingBatch);
+        ImmutableBytesWritable value, dictionary;
         for (IIRow pair : kv.encodeKeyValue(slice)) {
-            context.write(pair.getKey(), pair.getValue());
+            value = pair.getValue();
+            dictionary = pair.getDictionary();
+            int newLength = 4 + value.getLength() + dictionary.getLength();
+            if (newLength > valueBuf.limit()) {
+                valueBuf = ByteBuffer.allocate(newLength);
+            }
+            valueBuf.clear();
+            valueBuf.putInt(value.getLength());
+            valueBuf.put(value.get(), value.getOffset(), value.getLength());
+            valueBuf.put(dictionary.get(), dictionary.getOffset(), dictionary.getLength());
+            immutableBytesWritable.set(valueBuf.array(), 0, newLength);
+            context.write(pair.getKey(), immutableBytesWritable);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java
new file mode 100644
index 0000000..ef9b1c3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+import java.io.IOException;
+
+/**
+ */
+public class UpdateIIInfoAfterBuildStep extends AbstractExecutable {
+
+    private static final String II_NAME = "iiName";
+    private static final String JOB_ID = "jobId";
+
+    public UpdateIIInfoAfterBuildStep() {
+        super();
+    }
+
+    public void setInvertedIndexName(String cubeName) {
+        this.setParam(II_NAME, cubeName);
+    }
+
+    private String getInvertedIndexName() {
+        return getParam(II_NAME);
+    }
+
+    public void setJobId(String id) {
+        setParam(JOB_ID, id);
+    }
+
+    private String getJobId() {
+        return getParam(JOB_ID);
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+        IIInstance ii = mgr.getII(getInvertedIndexName());
+        IISegment segment = ii.getFirstSegment();
+        segment.setStatus(SegmentStatusEnum.READY);
+        
+        segment.setLastBuildJobID(getJobId());
+        segment.setLastBuildTime(System.currentTimeMillis());
+
+        try {
+            mgr.updateII(ii);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (IOException e) {
+            logger.error("fail to update inverted index after build", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
deleted file mode 100644
index 277dea5..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.invertedindex;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-import java.io.IOException;
-
-/**
- */
-public class UpdateInvertedIndexInfoAfterBuildStep extends AbstractExecutable {
-
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String II_NAME = "iiName";
-    private static final String JOB_ID = "jobId";
-
-    public UpdateInvertedIndexInfoAfterBuildStep() {
-        super();
-    }
-
-    public void setInvertedIndexName(String cubeName) {
-        this.setParam(II_NAME, cubeName);
-    }
-
-    private String getInvertedIndexName() {
-        return getParam(II_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-    
-    public void setJobId(String id) {
-        setParam(JOB_ID, id);
-    }
-
-    private String getJobId() {
-        return getParam(JOB_ID);
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-
-        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-        IIInstance ii = mgr.getII(getInvertedIndexName());
-        IISegment segment = ii.getFirstSegment();
-        segment.setStatus(SegmentStatusEnum.READY);
-        
-        segment.setLastBuildJobID(getJobId());
-        segment.setLastBuildTime(System.currentTimeMillis());
-
-        try {
-            mgr.updateII(ii);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to update inverted index after build", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
index d8cb24b..1cf3d98 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
@@ -33,6 +33,8 @@
  */
 package org.apache.kylin.engine.streaming;
 
+import org.apache.kylin.common.util.StreamingBatch;
+
 /**
  */
 public interface IStreamingInput {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
index 475e43a..4a3b8b8 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
@@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming;
 import java.util.Map;
 
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.StreamingBatch;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.engine.streaming.util.StreamingUtils;
 import org.apache.kylin.metadata.model.IBuildable;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java
deleted file mode 100644
index c7c7d29..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-package org.apache.kylin.engine.streaming;
-
-import java.util.List;
-
-import org.apache.kylin.common.util.Pair;
-
-/**
- */
-public final class StreamingBatch {
-
-    private final List<StreamingMessage> messages;
-
-    private final Pair<Long, Long> timeRange;
-
-    public StreamingBatch(List<StreamingMessage> messages, Pair<Long, Long> timeRange) {
-        this.messages = messages;
-        this.timeRange = timeRange;
-    }
-
-    public List<StreamingMessage> getMessages() {
-        return messages;
-    }
-
-    public Pair<Long, Long> getTimeRange() {
-        return timeRange;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
index d55ccdb..93cda2d 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
@@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming;
 import java.util.Map;
 
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.StreamingBatch;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.metadata.model.IBuildable;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java
deleted file mode 100644
index 7885902..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.kylin.engine.streaming;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class StreamingMessage {
-
-    private final List<String> data;
-
-    private long offset;
-
-    private long timestamp;
-
-    private Map<String, Object> params;
-
-    public static final StreamingMessage EOF = new StreamingMessage(Collections.<String> emptyList(), 0L, 0L, Collections.<String, Object> emptyMap());
-
-    public StreamingMessage(List<String> data, long offset, long timestamp, Map<String, Object> params) {
-        this.data = data;
-        this.offset = offset;
-        this.timestamp = timestamp;
-        this.params = params;
-    }
-
-    public final List<String> getData() {
-        return data;
-    }
-
-    public final long getOffset() {
-        return offset;
-    }
-
-    public final long getTimestamp() {
-        return timestamp;
-    }
-
-    public Map<String, Object> getParams() {
-        return params;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index 64a3061..ae72218 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -54,9 +54,9 @@ import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.cube.util.CubingUtils;
 import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.common.util.StreamingBatch;
 import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
deleted file mode 100644
index fa5a0b2..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming.invertedindex;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
-import org.apache.kylin.invertedindex.index.BatchSliceMaker;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
-/**
- */
-public final class SliceBuilder {
-
-    private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
-
-    private final BatchSliceMaker sliceMaker;
-    private final IIDesc iiDesc;
-    private final boolean useLocalDict;
-
-    public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
-        this.iiDesc = desc;
-        this.sliceMaker = new BatchSliceMaker(desc, shard);
-        this.useLocalDict = useLocalDict;
-    }
-
-    public Slice buildSlice(StreamingBatch microStreamBatch) {
-        final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
-            @Nullable
-            @Override
-            public List<String> apply(@Nullable StreamingMessage input) {
-                return input.getData();
-            }
-        });
-        final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
-        TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
-        return build(messages, tableRecordInfo, dictionaries);
-    }
-
-    private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
-        final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
-            @Nullable
-            @Override
-            public TableRecord apply(@Nullable List<String> input) {
-                TableRecord result = tableRecordInfo.createTableRecord();
-                for (int i = 0; i < input.size(); i++) {
-                    result.setValueString(i, input.get(i));
-                }
-                return result;
-            }
-        }));
-        slice.setLocalDictionaries(localDictionary);
-        return slice;
-    }
-}


[2/3] incubator-kylin git commit: KYLIN-1116 Use local dictionary for InvertedIndex batch building

Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
index 237bdd7..4c0c3ef 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
@@ -4,49 +4,8 @@
   "owner": null,
   "version": null,
   "cost": 10,
-  "status": "READY",
-  "segments": [
-    {
-      "uuid": null,
-      "name": "19700101000000_20140901000000",
-      "status": "READY",
-      "dictionaries": {
-        "DEFAULT.TEST_KYLIN_FACT/LSTG_SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID/30c9d5f0-abe4-4d1c-a147-610234d90ff1.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/9355165b-06ad-4c04-977c-a251e66e7e98.dict",
-        "DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME": "/dict/DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME/d5c40465-75e1-40bc-a960-06308f0134a6.dict",
-        "DEFAULT.TEST_KYLIN_FACT/SELLER_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/SELLER_ID/fea3a18d-3f20-4b8b-a880-7af93e69241b.dict",
-        "EDW.TEST_SITES/CRE_USER": "/dict/EDW.TEST_SITES/CRE_USER/244af7a2-7352-4b30-811f-46e637d7a133.dict",
-        "DEFAULT.TEST_KYLIN_FACT/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
-        "EDW.TEST_CAL_DT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
-        "DEFAULT.TEST_KYLIN_FACT/TRANS_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/TRANS_ID/7fb8fed8-7f3b-4089-a85d-3ac07f575c82.dict",
-        "EDW.TEST_CAL_DT/WEEK_BEG_DT": "/dict/EDW.TEST_CAL_DT/WEEK_BEG_DT/962b5f64-bee1-49ee-a072-af882193b719.dict",
-        "DEFAULT.TEST_KYLIN_FACT/SLR_SEGMENT_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
-        "EDW.TEST_SITES/SITE_NAME": "/dict/EDW.TEST_SITES/SITE_NAME/f363531d-e969-4264-bffd-ac18f8f47220.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE/1fc93a94-1feb-4af4-8078-81a6f1b65e2b.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME/895739d6-27e1-4ecc-b798-5851c319ea40.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
-        "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
-        "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC/bfb86010-bf4c-4534-a2b0-59d020aed197.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1/8ae44fb8-b01a-4db1-a901-dc5f463038cb.dict",
-        "EDW.TEST_SITES/SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
-        "DEFAULT.TEST_KYLIN_FACT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3/278d7283-518a-4cd0-b6e1-2573b523bf1f.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER/bb1bb7a5-b02d-45eb-b3c8-f1a4515264ca.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/b2d6fae1-eaac-4ac2-8a01-42e5c8b5c198.dict"
-      },
-      "storage_location_identifier": "test_III",
-      "date_range_start": 0,
-      "date_range_end": 0,
-      "size_kb": 0,
-      "input_records": 0,
-      "input_records_size": 0,
-      "last_build_time": 0,
-      "last_build_job_id": null,
-      "create_time": null,
-      "binary_signature": null
-    }
-  ],
+  "status": "DISABLED",
+  "segments": [],
   "last_modified": 1420016227424,
   "descriptor": "test_kylin_ii_inner_join_desc",
   "create_time": null,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
index 07c1970..90c21bb 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
@@ -4,49 +4,8 @@
   "owner": null,
   "version": null,
   "cost": 10,
-  "status": "READY",
-  "segments": [
-    {
-      "uuid": null,
-      "name": "19700101000000_20140901000000",
-      "status": "READY",
-      "dictionaries": {
-        "DEFAULT.TEST_KYLIN_FACT/LSTG_SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID/30c9d5f0-abe4-4d1c-a147-610234d90ff1.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/9355165b-06ad-4c04-977c-a251e66e7e98.dict",
-        "DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME": "/dict/DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME/d5c40465-75e1-40bc-a960-06308f0134a6.dict",
-        "DEFAULT.TEST_KYLIN_FACT/SELLER_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/SELLER_ID/fea3a18d-3f20-4b8b-a880-7af93e69241b.dict",
-        "EDW.TEST_SITES/CRE_USER": "/dict/EDW.TEST_SITES/CRE_USER/244af7a2-7352-4b30-811f-46e637d7a133.dict",
-        "DEFAULT.TEST_KYLIN_FACT/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
-        "EDW.TEST_CAL_DT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
-        "DEFAULT.TEST_KYLIN_FACT/TRANS_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/TRANS_ID/7fb8fed8-7f3b-4089-a85d-3ac07f575c82.dict",
-        "EDW.TEST_CAL_DT/WEEK_BEG_DT": "/dict/EDW.TEST_CAL_DT/WEEK_BEG_DT/962b5f64-bee1-49ee-a072-af882193b719.dict",
-        "DEFAULT.TEST_KYLIN_FACT/SLR_SEGMENT_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
-        "EDW.TEST_SITES/SITE_NAME": "/dict/EDW.TEST_SITES/SITE_NAME/f363531d-e969-4264-bffd-ac18f8f47220.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE/1fc93a94-1feb-4af4-8078-81a6f1b65e2b.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME/895739d6-27e1-4ecc-b798-5851c319ea40.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
-        "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
-        "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC/bfb86010-bf4c-4534-a2b0-59d020aed197.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1/8ae44fb8-b01a-4db1-a901-dc5f463038cb.dict",
-        "EDW.TEST_SITES/SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
-        "DEFAULT.TEST_KYLIN_FACT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3/278d7283-518a-4cd0-b6e1-2573b523bf1f.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER/bb1bb7a5-b02d-45eb-b3c8-f1a4515264ca.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/b2d6fae1-eaac-4ac2-8a01-42e5c8b5c198.dict"
-      },
-      "storage_location_identifier": "test_III",
-      "date_range_start": 0,
-      "date_range_end": 0,
-      "size_kb": 0,
-      "input_records": 0,
-      "input_records_size": 0,
-      "last_build_time": 0,
-      "last_build_job_id": null,
-      "create_time": null,
-      "binary_signature": null
-    }
-  ],
+  "status": "DISABLED",
+  "segments": [],
   "last_modified": 1420016227424,
   "descriptor": "test_kylin_ii_left_join_desc",
   "create_time": null,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json b/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
index a703ae4..9abe3ed 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
@@ -4,26 +4,8 @@
   "owner": null,
   "version": null,
   "cost": 10,
-  "status": "READY",
-  "segments": [
-    {
-      "uuid": null,
-      "name": "19700101000000_20190901000000",
-      "status": "READY",
-      "dictionaries": {
-      },
-      "storage_location_identifier": "KYLIN_2STEAMTEST",
-      "date_range_start": 0,
-      "date_range_end": 0,
-      "size_kb": 0,
-      "input_records": 0,
-      "input_records_size": 0,
-      "last_build_time": 0,
-      "last_build_job_id": null,
-      "create_time": null,
-      "binary_signature": null
-    }
-  ],
+  "status": "DISABLED",
+  "segments": [],
   "last_modified": 0,
   "descriptor": "test_streaming_table_ii_desc",
   "create_time": null,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index 7aff714..5633004 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -32,13 +32,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.IRealization;
@@ -131,45 +125,7 @@ public class IIManager implements IRealizationProvider {
         }
         return result;
     }
-
-    public void buildInvertedIndexDictionary(IISegment iiSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
-        logger.info("Start building ii dictionary");
-        DictionaryManager dictMgr = getDictionaryManager();
-        IIDesc iiDesc = iiSeg.getIIInstance().getDescriptor();
-        for (TblColRef column : iiDesc.listAllColumns()) {
-            logger.info("Dealing with column {}", column);
-            if (iiDesc.isMetricsCol(column)) {
-                continue;
-            }
-
-            DictionaryInfo dict = dictMgr.buildDictionary(iiDesc.getModel(), "true", column, factTableValueProvider);
-            iiSeg.putDictResPath(column, dict.getResourcePath());
-        }
-        updateII(iiSeg.getIIInstance());
-    }
-
-    /**
-     * return null if no dictionary for given column
-     */
-    public Dictionary<?> getDictionary(IISegment iiSeg, TblColRef col) {
-        DictionaryInfo info = null;
-        try {
-            DictionaryManager dictMgr = getDictionaryManager();
-            // logger.info("Using metadata url " + metadataUrl +
-            // " for DictionaryManager");
-            String dictResPath = iiSeg.getDictResPath(col);
-            if (dictResPath == null)
-                return null;
-
-            info = dictMgr.getDictionaryInfo(dictResPath);
-            if (info == null)
-                throw new IllegalStateException("No dictionary found by " + dictResPath + ", invalid II state; II segment" + iiSeg + ", col " + col);
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to get dictionary for II segment" + iiSeg + ", col" + col, e);
-        }
-
-        return info.getDictionaryObject();
-    }
+    
 
     public IIInstance createII(IIInstance ii) throws IOException {
 
@@ -300,10 +256,6 @@ public class IIManager implements IRealizationProvider {
         }
     }
 
-    private DictionaryManager getDictionaryManager() {
-        return DictionaryManager.getInstance(config);
-    }
-
     private ResourceStore getStore() {
         return ResourceStore.getStore(this.config);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
index adcca8b..c3ca464 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
@@ -19,18 +19,14 @@
 package org.apache.kylin.invertedindex;
 
 import java.text.SimpleDateFormat;
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.TimeZone;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.IDictionaryAware;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -49,7 +45,7 @@ import org.apache.kylin.metadata.realization.IRealizationSegment;
 
 // TODO: remove segment concept for II, append old hbase table
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class IISegment implements Comparable<IISegment>, IDictionaryAware, IRealizationSegment {
+public class IISegment implements Comparable<IISegment>, IRealizationSegment {
 
     @JsonBackReference
     private IIInstance iiInstance;
@@ -83,11 +79,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
     private String binarySignature; // a hash of schema and dictionary ID,
     // used for sanity check
 
-    @JsonProperty("dictionaries")
-    private ConcurrentHashMap<String, String> dictionaries; // table/column ==>
-    // dictionary
-    // resource path
-
     private transient TableRecordInfo tableRecordInfo;
 
     /**
@@ -216,28 +207,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
         return storageLocationIdentifier;
     }
 
-    public Map<String, String> getDictionaries() {
-        if (dictionaries == null)
-            dictionaries = new ConcurrentHashMap<String, String>();
-        return dictionaries;
-    }
-
-    public Collection<String> getDictionaryPaths() {
-        return getDictionaries().values();
-    }
-
-    public String getDictResPath(TblColRef col) {
-        return getDictionaries().get(dictKey(col));
-    }
-
-    public void putDictResPath(TblColRef col, String dictResPath) {
-        getDictionaries().put(dictKey(col), dictResPath);
-    }
-
-    private String dictKey(TblColRef col) {
-        return col.getTable() + "/" + col.getName();
-    }
-
     /**
      * @param storageLocationIdentifier the storageLocationIdentifier to set
      */
@@ -262,10 +231,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
         return tableRecordInfo;
     }
 
-    //    public void updateDictionary(List<Dictionary<?>> dicts) {
-    //        getTableRecordInfo().updateDictionary( dicts);
-    //    }
-
     public List<TblColRef> getColumns() {
         return this.getTableRecordInfo().getColumns();
     }
@@ -275,20 +240,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
         return Objects.toStringHelper(this).add("uuid", uuid).add("create_time_utc:", createTimeUTC).add("name", name).add("last_build_job_id", lastBuildJobID).add("status", status).toString();
     }
 
-    @Override
-    public int getColumnLength(TblColRef col) {
-
-        int index = getTableRecordInfo().findColumn(col);
-        return getTableRecordInfo().getDigest().length(index);
-    }
-
-    @Override
-    public Dictionary<?> getDictionary(TblColRef col) {
-
-        int index = getTableRecordInfo().findColumn(col);
-        return getTableRecordInfo().dict(index);
-    }
-
     public long getCreateTimeUTC() {
         return createTimeUTC;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
new file mode 100644
index 0000000..1c293d7
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.invertedindex.index;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ */
+public final class SliceBuilder {
+
+    private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
+
+    private final BatchSliceMaker sliceMaker;
+    private final IIDesc iiDesc;
+    private final boolean useLocalDict;
+
+    public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
+        this.iiDesc = desc;
+        this.sliceMaker = new BatchSliceMaker(desc, shard);
+        this.useLocalDict = useLocalDict;
+    }
+
+    public Slice buildSlice(StreamingBatch microStreamBatch) {
+        final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
+            @Nullable
+            @Override
+            public List<String> apply(@Nullable StreamingMessage input) {
+                return input.getData();
+            }
+        });
+        final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+        TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
+        return build(messages, tableRecordInfo, dictionaries);
+    }
+
+    private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
+        final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
+            @Nullable
+            @Override
+            public TableRecord apply(@Nullable List<String> input) {
+                TableRecord result = tableRecordInfo.createTableRecord();
+                for (int i = 0; i < input.size(); i++) {
+                    result.setValueString(i, input.get(i));
+                }
+                return result;
+            }
+        }));
+        slice.setLocalDictionaries(localDictionary);
+        return slice;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
index baafacd..8a0c2ba 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
@@ -53,11 +53,6 @@ public class IIInstanceTest extends LocalFileMetadataTestCase {
 
         Assert.assertTrue(iiInstances.size() > 0);
 
-        IIInstance instance = iiInstances.get(0);
-
-        Dictionary dict = mgr.getDictionary(instance.getFirstSegment(), instance.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_SITE_ID"));
-
-        Assert.assertNotNull(dict);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 1eb2683..b8d1333 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
@@ -78,6 +79,8 @@ public class HiveMRInput implements IMRInput {
             try {
                 HCatInputFormat.setInput(job, dbName, tableName);
                 job.setInputFormatClass(HCatInputFormat.class);
+
+                job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class);
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index a1ab712..ee5a555 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -48,8 +48,8 @@ import kafka.message.MessageAndOffset;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.source.kafka.util.KafkaRequester;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index aace8bc..3455f1d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -47,7 +47,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.config.KafkaConfig;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index c0e506f..9691ea7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -40,7 +40,7 @@ import java.util.List;
 
 import kafka.message.MessageAndOffset;
 
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 4fae228..00f93a5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -43,7 +43,7 @@ import kafka.message.MessageAndOffset;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index de5e58e..0e29a0c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -40,7 +40,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.DaemonThreadFactory;
 import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.StreamingParser;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index 2833ea4..96b7fa7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -11,7 +11,7 @@ import kafka.javaapi.PartitionMetadata;
 import kafka.message.MessageAndOffset;
 
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.source.kafka.StreamingParser;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
index e4b688f..0e0a8ce 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.storage.hbase.ii;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
@@ -43,13 +44,29 @@ public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, Imm
     @Override
     protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
 
+        ByteBuffer buffer = ByteBuffer.wrap(value.get(), value.getOffset(), value.getLength());
+        int totalLength = value.getLength();
+        int valueLength = buffer.getInt();
+        int dictionaryLength = totalLength - valueLength - 4;
         KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
                 IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
                 IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, //
                 timestamp, Type.Put, //
-                value.get(), value.getOffset(), value.getLength());
+                buffer.array(), buffer.position(), valueLength);
 
+        // write value
         context.write(key, kv);
+
+        kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
+                IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
+                IIDesc.HBASE_DICTIONARY_BYTES, 0, IIDesc.HBASE_DICTIONARY_BYTES.length, //
+                timestamp, Type.Put, //
+                buffer.array(), buffer.position() + valueLength, dictionaryLength);
+
+
+        // write dictionary
+        context.write(key, kv);
+        
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
index 0a72a91..bcfe346 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
@@ -47,7 +47,7 @@ public class IICreateHTableJob extends AbstractHadoopJob {
     @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
-
+        HBaseAdmin admin = null;
         try {
             options.addOption(OPTION_II_NAME);
             options.addOption(OPTION_HTABLE_NAME);
@@ -61,6 +61,22 @@ public class IICreateHTableJob extends AbstractHadoopJob {
             IIInstance ii = iiManager.getII(iiName);
             int sharding = ii.getDescriptor().getSharding();
 
+
+            Configuration conf = HBaseConfiguration.create(getConf());
+            // check if the table already exists
+            admin = new HBaseAdmin(conf);
+            if (admin.tableExists(tableName)) {
+                if (admin.isTableEnabled(tableName)) {
+                    logger.info("Table " + tableName + " already exists and is enabled, no need to create.");
+                    return 0;
+                } else {
+                    logger.error("Table " + tableName + " is disabled, couldn't append data");
+                    return 1;
+                }
+            }
+        
+            // table doesn't exist, need to create
+
             HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
             HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
             cf.setMaxVersions(1);
@@ -100,7 +116,6 @@ public class IICreateHTableJob extends AbstractHadoopJob {
             tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
             tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
 
-            Configuration conf = HBaseConfiguration.create(getConf());
             if (User.isHBaseSecurityEnabled(conf)) {
                 // add coprocessor for bulk load
                 tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
@@ -108,13 +123,7 @@ public class IICreateHTableJob extends AbstractHadoopJob {
 
             IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
 
-            // drop the table first
-            HBaseAdmin admin = new HBaseAdmin(conf);
-            if (admin.tableExists(tableName)) {
-                admin.disableTable(tableName);
-                admin.deleteTable(tableName);
-            }
-
+          
             // create table
             byte[][] splitKeys = getSplits(sharding);
             if (splitKeys.length == 0)
@@ -126,12 +135,14 @@ public class IICreateHTableJob extends AbstractHadoopJob {
                 }
             }
             System.out.println("create hbase table " + tableName + " done.");
-            admin.close();
 
             return 0;
         } catch (Exception e) {
             printUsage(options);
             throw e;
+        } finally {
+            if (admin != null)
+                admin.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
index ff8b659..11c1711 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
@@ -54,7 +54,7 @@ public class HBaseMROutput implements IMROutput {
 
             @Override
             public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-                steps.addCubingGarbageCollectionSteps(jobFlow);
+                steps.addInvertedIndexGarbageCollectionSteps(jobFlow);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 1267d2d..03db6b0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -184,7 +184,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
     }
 
     public void addSaveIIToHTableSteps(DefaultChainedExecutable jobFlow, String rootPath) {
-        // create htable step
+        // create htable if it doesn't exist
         jobFlow.addTask(createCreateIIHTableStep(seg));
 
         final String iiPath = rootPath + "*";
@@ -198,6 +198,22 @@ public class HBaseMRSteps extends JobBuilderSupport {
     }
 
 
+    public void addInvertedIndexGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+        String jobId = jobFlow.getId();
+
+        List<String> toDeletePaths = new ArrayList<>();
+        toDeletePaths.add(getJobWorkingDir(jobId));
+
+        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        step.setDeletePaths(toDeletePaths);
+        step.setJobId(jobId);
+
+        jobFlow.addTask(step);
+    }
+
+    
+
     private HadoopShellExecutable createCreateIIHTableStep(IRealizationSegment seg) {
         HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
         createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index b200c2e..15dc993 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -46,7 +46,6 @@ import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
@@ -108,7 +107,7 @@ public class DeployCoprocessorCLI {
 
     private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
 
         String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 88cb7de..2137f57 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
index 335e00c..6c6ed80 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
@@ -6,6 +6,7 @@ import java.util.Arrays;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -40,7 +41,10 @@ public class TsConditionEraserTest extends LocalFileMetadataTestCase {
     @Before
     public void setup() throws IOException {
         this.createTestMetadata();
-        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        IIManager iiManager = IIManager.getInstance(getTestConfig());
+        this.ii = iiManager.getII("test_kylin_ii_left_join");
+        IISegment segment = iiManager.buildSegment(ii, 0, System.currentTimeMillis());
+        ii.getSegments().add(segment);
         this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
         this.factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT");
         this.caldt = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
index e271129..8b56605 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
@@ -18,20 +18,11 @@
 
 package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
@@ -45,7 +36,11 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 /**
  *
@@ -90,6 +85,10 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
     @Test
     public void testSerializeAggregator() {
         final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        if (ii.getFirstSegment() == null) {
+            IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+            ii.getSegments().add(segment);
+        }
         final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
         final EndpointAggregators endpointAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
         byte[] x = EndpointAggregators.serialize(endpointAggregators);
@@ -139,6 +138,10 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
     @Test
     public void basicTest() {
         final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        if (ii.getFirstSegment() == null) {
+            IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+            ii.getSegments().add(segment);
+        }
         final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
         final EndpointAggregators aggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
         final EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
index 791002f..3e34495 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
 import org.junit.After;
@@ -41,6 +42,10 @@ public class TableRecordInfoTest extends LocalFileMetadataTestCase {
     public void setup() throws IOException {
         this.createTestMetadata();
         this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        if (ii.getFirstSegment() == null) {
+            IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+            ii.getSegments().add(segment);
+        }
         this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
index 412e335..4e5a9d9 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -58,6 +59,10 @@ public class TsConditionExtractorTest extends LocalFileMetadataTestCase {
     public void setup() throws IOException {
         this.createTestMetadata();
         this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        if (ii.getFirstSegment() == null) {
+            IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+            ii.getSegments().add(segment);
+        }
         this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
         this.factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT");
         this.calDt = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");