You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 09:13:13 UTC

[4/4] incubator-kylin git commit: KYLIN-1112 Reorganize InvertedIndex source codes into plug-in architecture

KYLIN-1112 Reorganize InvertedIndex source codes into plug-in architecture


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f8590d25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f8590d25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f8590d25

Branch: refs/heads/2.x-staging
Commit: f8590d25e11e10767cebc56e9081e3de520487da
Parents: ca4426c
Author: shaofengshi <sh...@apache.org>
Authored: Thu Oct 29 16:52:17 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 5 16:10:57 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/BuildIIWithEngineTest.java |  11 +-
 .../apache/kylin/job/BuildIIWithStreamTest.java |  10 +-
 .../kylin/job/hadoop/invertedindex/IICLI.java   | 106 +++++++++
 .../kylin/job/hadoop/invertedindex/IITest.java  |   2 +-
 .../kylin/common/util/ImplementationSwitch.java |   4 +-
 .../java/org/apache/kylin/cube/CubeSegment.java |  18 +-
 .../kylin/job/constant/ExecutableConstants.java |   1 +
 .../kylin/metadata/model/IEngineAware.java      |   1 +
 .../realization/IRealizationSegment.java        |  20 ++
 .../kylin/storage/cache/StorageMockUtils.java   |   2 +-
 engine-mr/pom.xml                               |  21 +-
 .../kylin/engine/mr/BatchCubingJobBuilder.java  |  23 +-
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |  12 +-
 .../kylin/engine/mr/BatchMergeJobBuilder.java   |  12 +-
 .../kylin/engine/mr/BatchMergeJobBuilder2.java  |  15 +-
 .../org/apache/kylin/engine/mr/IMRInput.java    |   4 +-
 .../org/apache/kylin/engine/mr/IMROutput.java   |  27 +++
 .../kylin/engine/mr/JobBuilderSupport.java      |  46 ++--
 .../java/org/apache/kylin/engine/mr/MRUtil.java |  11 +-
 .../engine/mr/common/AbstractHadoopJob.java     |  22 ++
 .../mr/invertedindex/BatchIIJobBuilder.java     | 132 +++++++++++
 .../CreateInvertedIndexDictionaryJob.java       |  70 ++++++
 .../IIDistinctColumnsCombiner.java              |  58 +++++
 .../mr/invertedindex/IIDistinctColumnsJob.java  | 138 +++++++++++
 .../invertedindex/IIDistinctColumnsMapper.java  |  75 ++++++
 .../invertedindex/IIDistinctColumnsReducer.java |  77 +++++++
 .../kylin/engine/mr/invertedindex/IIJob.java    |  73 ++++++
 .../engine/mr/invertedindex/IIJobBuilder.java   | 219 ++++++++++++++++++
 .../mr/invertedindex/InvertedIndexJob.java      | 148 ++++++++++++
 .../mr/invertedindex/InvertedIndexMapper.java   |  87 +++++++
 .../invertedindex/InvertedIndexPartitioner.java |  73 ++++++
 .../mr/invertedindex/InvertedIndexReducer.java  | 100 ++++++++
 .../UpdateInvertedIndexInfoAfterBuildStep.java  |  93 ++++++++
 .../engine/spark/SparkCubingJobBuilder.java     |   7 +-
 engine-streaming/pom.xml                        |   5 +
 .../streaming/invertedindex/SliceBuilder.java   |  81 +++++++
 examples/test_case_data/sandbox/hbase-site.xml  |  19 +-
 invertedindex/pom.xml                           |  23 +-
 .../apache/kylin/invertedindex/IIInstance.java  |  18 +-
 .../apache/kylin/invertedindex/IISegment.java   |  34 ++-
 .../kylin/invertedindex/model/IIDesc.java       |  32 ++-
 .../invertedindex/model/IIKeyValueCodec.java    |   3 +-
 .../invertedindex/streaming/SliceBuilder.java   |  81 -------
 .../dict/CreateInvertedIndexDictionaryJob.java  |  70 ------
 .../job/hadoop/invertedindex/IIBulkLoadJob.java |  74 ------
 .../hadoop/invertedindex/IICreateHFileJob.java  |  81 -------
 .../invertedindex/IICreateHFileMapper.java      |  55 -----
 .../hadoop/invertedindex/IICreateHTableJob.java | 148 ------------
 .../invertedindex/IIDeployCoprocessorCLI.java   | 157 -------------
 .../IIDistinctColumnsCombiner.java              |  58 -----
 .../invertedindex/IIDistinctColumnsJob.java     | 136 -----------
 .../invertedindex/IIDistinctColumnsMapper.java  |  66 ------
 .../invertedindex/IIDistinctColumnsReducer.java |  77 -------
 .../hadoop/invertedindex/InvertedIndexJob.java  | 164 -------------
 .../invertedindex/InvertedIndexMapper.java      |  90 --------
 .../invertedindex/InvertedIndexPartitioner.java |  73 ------
 .../invertedindex/InvertedIndexReducer.java     | 100 --------
 .../apache/kylin/job/invertedindex/IIJob.java   |  50 ----
 .../kylin/job/invertedindex/IIJobBuilder.java   | 230 -------------------
 .../java/org/apache/kylin/job/tools/IICLI.java  | 106 ---------
 jdbc/kylin_jdbc.log.2014-12-22                  |  18 --
 server/pom.xml                                  |   5 +
 .../apache/kylin/source/hive/HiveMRInput.java   |  13 +-
 .../kylin/storage/hbase/ii/IIBulkLoadJob.java   |  68 ++++++
 .../storage/hbase/ii/IICreateHFileJob.java      |  81 +++++++
 .../storage/hbase/ii/IICreateHFileMapper.java   |  55 +++++
 .../storage/hbase/ii/IICreateHTableJob.java     | 149 ++++++++++++
 .../storage/hbase/steps/HBaseMROutput.java      |  18 ++
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  94 ++++++--
 .../hbase/util/IIDeployCoprocessorCLI.java      | 157 +++++++++++++
 70 files changed, 2450 insertions(+), 1957 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
index a3b3db2..0158fad 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.engine.mr.invertedindex.BatchIIJobBuilder;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
@@ -44,8 +45,7 @@ import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.invertedindex.IIJob;
-import org.apache.kylin.job.invertedindex.IIJobBuilder;
+import org.apache.kylin.engine.mr.invertedindex.IIJob;
 import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
@@ -93,6 +93,7 @@ public class BuildIIWithEngineTest {
     public static void beforeClass() throws Exception {
         logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
         if (System.getProperty("hdp.version") == null) {
             throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
         }
@@ -144,7 +145,6 @@ public class BuildIIWithEngineTest {
     }
 
     @Test
-    @Ignore
     public void testBuildII() throws Exception {
 
         String[] testCase = new String[] { "buildIIInnerJoin", "buildIILeftJoin" };
@@ -225,8 +225,9 @@ public class BuildIIWithEngineTest {
         IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate);
         iiInstance.getSegments().add(segment);
         iiManager.updateII(iiInstance);
-        IIJobBuilder iiJobBuilder = new IIJobBuilder(jobEngineConfig);
-        IIJob job = iiJobBuilder.buildJob(segment, "TEST");
+
+        BatchIIJobBuilder batchIIJobBuilder = new BatchIIJobBuilder(segment, "SYSTEM");
+        IIJob job = batchIIJobBuilder.build();
         jobService.addJob(job);
         waitForJob(job.getId());
         return job.getId();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 61cf262..b64a7c5 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -40,12 +40,7 @@ import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -66,11 +61,11 @@ import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.invertedindex.streaming.SliceBuilder;
+import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
+import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.source.hive.HiveTableReader;
@@ -110,7 +105,6 @@ public class BuildIIWithStreamTest {
 
         kylinConfig = KylinConfig.getInstanceFromEnv();
         iiManager = IIManager.getInstance(kylinConfig);
-        iiManager = IIManager.getInstance(kylinConfig);
         for (String iiInstance : II_NAME) {
 
             IIInstance ii = iiManager.getII(iiInstance);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IICLI.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IICLI.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IICLI.java
new file mode 100644
index 0000000..7e7be34
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IICLI.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+
+/**
+ * @author yangli9
+ */
+public class IICLI {
+
+    public static void main(String[] args) throws IOException {
+        Configuration hconf = HadoopUtil.getCurrentConfiguration();
+        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+        String iiName = args[0];
+        IIInstance ii = mgr.getII(iiName);
+
+        String path = args[1];
+        System.out.println("Reading from " + path + " ...");
+
+        TableRecordInfo info = new TableRecordInfo(ii.getFirstSegment());
+        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+        int count = 0;
+        for (Slice slice : codec.decodeKeyValue(readSequenceKVs(hconf, path))) {
+            for (RawTableRecord rec : slice) {
+                System.out.printf(new TableRecord(rec, info).toString());
+                count++;
+            }
+        }
+        System.out.println("Total " + count + " records");
+    }
+
+    public static Iterable<IIRow> readSequenceKVs(Configuration hconf, String path) throws IOException {
+        final Reader reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
+        return new Iterable<IIRow>() {
+            @Override
+            public Iterator<IIRow> iterator() {
+                return new Iterator<IIRow>() {
+                    ImmutableBytesWritable k = new ImmutableBytesWritable();
+                    ImmutableBytesWritable v = new ImmutableBytesWritable();
+                    IIRow pair = new IIRow(k, v, null);
+
+                    @Override
+                    public boolean hasNext() {
+                        boolean hasNext = false;
+                        try {
+                            hasNext = reader.next(k, v);
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        } finally {
+                            if (hasNext == false) {
+                                IOUtils.closeQuietly(reader);
+                            }
+                        }
+                        return hasNext;
+                    }
+
+                    @Override
+                    public IIRow next() {
+                        return pair;
+                    }
+
+                    @Override
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index e113c06..200156a 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -35,7 +35,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
 import org.apache.kylin.invertedindex.model.IIRow;
 import org.apache.kylin.invertedindex.model.KeyValueCodec;
-import org.apache.kylin.invertedindex.streaming.SliceBuilder;
+import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
index f6924a0..4a47b83 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
@@ -44,7 +44,7 @@ public class ImplementationSwitch<I> {
             maxId = Math.max(maxId, id);
         }
         if (maxId > 100)
-            throw new IllegalArgumentException("you have more than 100 implentations?");
+            throw new IllegalArgumentException("you have more than 100 implementations?");
 
         Object[] result = new Object[maxId + 1];
 
@@ -65,7 +65,7 @@ public class ImplementationSwitch<I> {
         I result = (I) instances[id];
 
         if (result == null)
-            throw new IllegalArgumentException("Implementations missing, ID " + id + ", interafce " + interfaceClz.getName());
+            throw new IllegalArgumentException("Implementations missing, ID " + id + ", interface " + interfaceClz.getName());
 
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 1a44fcf..7d17d30 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -27,9 +27,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.IDictionaryAware;
-import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -39,9 +40,11 @@ import com.fasterxml.jackson.annotation.JsonBackReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Objects;
 import com.google.common.collect.Maps;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IBuildable {
+public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IRealizationSegment {
 
     @JsonBackReference
     private CubeInstance cubeInstance;
@@ -115,6 +118,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
         this.uuid = id;
     }
 
+    @Override
     public String getName() {
         return name;
     }
@@ -211,6 +215,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
         this.cubeInstance = cubeInstance;
     }
 
+    @Override
     public String getStorageLocationIdentifier() {
 
         return storageLocationIdentifier;
@@ -410,4 +415,13 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
         return ret;
     }
 
+    @Override
+    public IRealization getRealization() {
+        return cubeInstance;
+    }
+
+    @Override
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc() {
+        return new CubeJoinedFlatTableDesc(this.getCubeDesc(), this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 30cefaf..ba50880 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -59,6 +59,7 @@ public final class ExecutableConstants {
 
     public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
     public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
+    public static final String STEP_NAME_UPDATE_II_INFO = "Update Inverted Index Info";
 
     public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
     public static final String PROP_JOB_FLOW = "jobFlow";

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
index 60bd825..882b2e3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
@@ -22,6 +22,7 @@ public interface IEngineAware {
 
     public static final int ID_MR_V1 = 0;
     public static final int ID_MR_V2 = 2;
+    public static final int ID_MR_II = 3;
     public static final int ID_SPARK = 5;
 
     int getEngineType();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
new file mode 100644
index 0000000..afab86b
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java
@@ -0,0 +1,20 @@
+package org.apache.kylin.metadata.realization;
+
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+
+/**
+ * Created by shaoshi on 10/30/15.
+ */
+public interface IRealizationSegment extends IBuildable {
+
+    public String getUuid();
+    
+    public String getName();
+
+    public String getStorageLocationIdentifier();
+    
+    public IRealization getRealization();
+    
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
index 1bf8e6a..2b5ceee 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
@@ -100,7 +100,7 @@ public class StorageMockUtils {
     }
 
     public static CompareTupleFilter buildFilter1(TblColRef column) {
-        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
         ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
         compareFilter.addChild(columnFilter1);
         ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index 7a2bfe5..4500555 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -34,7 +34,14 @@
     </properties>
 
     <dependencies>
-    
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-cube</artifactId>
@@ -42,23 +49,21 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-job</artifactId>
+            <artifactId>kylin-invertedindex</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-storage</artifactId>
+            <artifactId>kylin-core-job</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
-
-        <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-common</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
+            <artifactId>kylin-core-storage</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+
+        <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index d00f592..dcb887d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
@@ -38,13 +39,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
     public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
         super(newSegment, submitter);
         this.inputSide = MRUtil.getBatchCubingInputSide(seg);
-        this.outputSide = MRUtil.getBatchCubingOutputSide(seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide((CubeSegment)seg);
     }
 
     public CubingJob build() {
         logger.info("MR_V1 new job to BUILD segment " + seg);
-        
-        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+        final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);
 
@@ -56,8 +56,9 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         result.addTask(createBuildDictionaryStep(jobId));
 
         // Phase 3: Build Cube
-        final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
-        final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+        RowKeyDesc rowKeyDesc = ((CubeSegment)seg).getCubeDesc().getRowkey();
+        final int groupRowkeyColumnsCount = rowKeyDesc.getNCuboidBuildLevels();
+        final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length;
         final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
         // base cuboid step
         result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
@@ -81,15 +82,15 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
 
         StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
+        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
 
         baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
 
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
         appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input
         appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
         appendExecCmdParameters(cmd, "level", "0");
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
@@ -105,12 +106,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
         StringBuilder cmd = new StringBuilder();
 
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
         appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
         appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
         appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
 
         ndCuboidStep.setMapReduceParams(cmd.toString());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index fe9f1d6..f8fbc33 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -37,13 +37,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
     public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
         super(newSegment, submitter);
         this.inputSide = MRUtil.getBatchCubingInputSide(seg);
-        this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment)seg);
     }
 
     public CubingJob build() {
         logger.info("MR_V2 new job to BUILD segment " + seg);
         
-        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+        final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config);
         final String jobId = result.getId();
 
         // Phase 1: Create Flat Table
@@ -70,7 +70,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
     private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
         SaveStatisticsStep result = new SaveStatisticsStep();
         result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
+        result.setCubeName(seg.getRealization().getName());
         result.setSegmentId(seg.getUuid());
         result.setStatisticsPath(getStatisticsPath(jobId));
         return result;
@@ -81,13 +81,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         MapReduceExecutable cubeStep = new MapReduceExecutable();
 
         StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
+        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
 
         cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
 
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getRealization().getName());
         appendExecCmdParameters(cmd, "jobflowid", jobId);
 
         cubeStep.setMapReduceParams(cmd.toString());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index bc377ed..4b93b5d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -39,17 +39,17 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
 
     public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
-        this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
+        this.outputSide = MRUtil.getBatchMergeOutputSide((CubeSegment)seg);
     }
 
     public CubingJob build() {
         logger.info("MR_V1 new job to MERGE segment " + seg);
-        
-        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+        final CubeSegment cubeSegment = (CubeSegment)seg;
+        final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);
 
-        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         final List<String> mergingSegmentIds = Lists.newArrayList();
         final List<String> mergingCuboidPaths = Lists.newArrayList();
@@ -63,7 +63,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
 
         // Phase 2: Merge Cube Files
         String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
-        result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
+        result.addTask(createMergeCuboidDataStep(cubeSegment, formattedPath, cuboidRootPath));
         outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
 
         // Phase 3: Update Metadata & Cleanup
@@ -78,7 +78,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
         mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
         StringBuilder cmd = new StringBuilder();
 
-        appendMapReduceParameters(cmd, seg);
+        appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
         appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
         appendExecCmdParameters(cmd, "input", inputPath);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 443cf95..48a717f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -39,16 +39,17 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
 
     public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
-        this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
+        this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment)seg);
     }
 
     public CubingJob build() {
         logger.info("MR_V2 new job to MERGE segment " + seg);
         
-        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+        final CubeSegment cubeSegment = (CubeSegment)seg;
+        final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
         final String jobId = result.getId();
 
-        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         final List<String> mergingSegmentIds = Lists.newArrayList();
         final List<String> mergingHTables = Lists.newArrayList();
@@ -59,7 +60,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
 
         // Phase 1: Merge Dictionary
         result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-        result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
+        result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
         outputSide.addStepPhase1_MergeDictionary(result);
 
         // Phase 2: Merge Cube
@@ -89,10 +90,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
         StringBuilder cmd = new StringBuilder();
 
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getRealization().getName() + "_Step");
         appendExecCmdParameters(cmd, "jobflowid", jobId);
 
         mergeCuboidDataStep.setMapReduceParams(cmd.toString());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 336a66f..a61e0dd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -19,9 +19,9 @@
 package org.apache.kylin.engine.mr;
 
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
 
 /**
  * Any ITableSource that wishes to serve as input of MapReduce build engine must adapt to this interface.
@@ -29,7 +29,7 @@ import org.apache.kylin.metadata.model.TableDesc;
 public interface IMRInput {
 
     /** Return a helper to participate in batch cubing job flow. */
-    public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
+    public IMRBatchCubingInputSide getBatchCubingInputSide(IRealizationSegment seg);
 
     /** Return an InputFormat that reads from specified table. */
     public IMRTableInputFormat getTableInputFormat(TableDesc table);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index 577a836..e989042 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
 public interface IMROutput {
@@ -26,6 +27,10 @@ public interface IMROutput {
     /** Return a helper to participate in batch cubing job flow. */
     public IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
 
+
+    /** Return a helper to participate in batch cubing job flow. */
+    public IMRBatchInvertedIndexingOutputSide getBatchInvertedIndexingOutputSide(IISegment seg);
+
     /**
      * Participate the batch cubing flow as the output side. Responsible for saving
      * the cuboid output to storage (Phase 3).
@@ -75,4 +80,26 @@ public interface IMROutput {
         /** Add step that does any necessary clean up. */
         public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
     }
+
+
+    /**
+     * Participate the batch inverted indexing flow as the output side. Responsible for saving
+     * the output to storage (Phase 3).
+     *
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary
+     * - Phase 3: Build II
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    public interface IMRBatchInvertedIndexingOutputSide {
+
+        /**
+         * Add step that saves II output from HDFS to storage.
+         *
+         */
+        public void addStepPhase3_BuildII(DefaultChainedExecutable jobFlow, String rootPath);
+
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 5fba37c..8c770f9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -25,15 +25,19 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.invertedindex.UpdateInvertedIndexInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
 import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
 import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 
 import com.google.common.base.Preconditions;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
 
 /**
  * Hold reusable steps for builders.
@@ -41,10 +45,10 @@ import com.google.common.base.Preconditions;
 public class JobBuilderSupport {
 
     final protected JobEngineConfig config;
-    final protected CubeSegment seg;
+    final protected IRealizationSegment seg;
     final protected String submitter;
 
-    public JobBuilderSupport(CubeSegment seg, String submitter) {
+    public JobBuilderSupport(IRealizationSegment seg, String submitter) {
         Preconditions.checkNotNull(seg, "segment cannot be null");
         this.config = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
         this.seg = seg;
@@ -64,14 +68,14 @@ public class JobBuilderSupport {
         result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
         result.setMapReduceJobClass(FactDistinctColumnsJob.class);
         StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
         appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
         appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
         appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
 
         result.setMapReduceParams(cmd.toString());
         return result;
@@ -82,7 +86,7 @@ public class JobBuilderSupport {
         HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
         buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
         StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
         appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
 
@@ -94,7 +98,7 @@ public class JobBuilderSupport {
     public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
         final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
         updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
+        updateCubeInfoStep.setCubeName(seg.getRealization().getName());
         updateCubeInfoStep.setSegmentId(seg.getUuid());
         updateCubeInfoStep.setCubingJobId(jobId);
         return updateCubeInfoStep;
@@ -103,7 +107,7 @@ public class JobBuilderSupport {
     public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
         MergeDictionaryStep result = new MergeDictionaryStep();
         result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
-        result.setCubeName(seg.getCubeInstance().getName());
+        result.setCubeName(seg.getRealization().getName());
         result.setSegmentId(seg.getUuid());
         result.setMergingSegmentIds(mergingSegmentIds);
         return result;
@@ -112,30 +116,44 @@ public class JobBuilderSupport {
     public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
         UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
         result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        result.setCubeName(seg.getCubeInstance().getName());
+        result.setCubeName(seg.getRealization().getName());
         result.setSegmentId(seg.getUuid());
         result.setMergingSegmentIds(mergingSegmentIds);
         result.setCubingJobId(jobId);
         return result;
     }
 
+
+
+    public UpdateInvertedIndexInfoAfterBuildStep createUpdateInvertedIndexInfoAfterBuildStep(String jobId) {
+        final UpdateInvertedIndexInfoAfterBuildStep updateIIInfoStep = new UpdateInvertedIndexInfoAfterBuildStep();
+        updateIIInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_II_INFO);
+        updateIIInfoStep.setInvertedIndexName(seg.getRealization().getName());
+        updateIIInfoStep.setSegmentId(seg.getUuid());
+        updateIIInfoStep.setJobId(jobId);
+        return updateIIInfoStep;
+    }
+
     // ============================================================================
 
     public String getJobWorkingDir(String jobId) {
         return getJobWorkingDir(config, jobId);
     }
 
+    public String getRealizationRootPath(String jobId) {
+        return getJobWorkingDir(jobId) + "/" + seg.getRealization().getName();
+    }
     public String getCuboidRootPath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
+        return getRealizationRootPath(jobId) + "/cuboid/";
     }
 
     public String getCuboidRootPath(CubeSegment seg) {
         return getCuboidRootPath(seg.getLastBuildJobID());
     }
 
-    public void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
+    public void appendMapReduceParameters(StringBuilder buf, DataModelDesc modelDesc) {
         try {
-            String jobConf = config.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
+            String jobConf = config.getHadoopJobConfFilePath(modelDesc.getCapacity());
             if (jobConf != null && jobConf.length() > 0) {
                 buf.append(" -conf ").append(jobConf);
             }
@@ -145,11 +163,11 @@ public class JobBuilderSupport {
     }
 
     public String getFactDistinctColumnsPath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
+        return getRealizationRootPath(jobId) + "/fact_distinct_columns";
     }
 
     public String getStatisticsPath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/statistics";
+        return getRealizationRootPath(jobId) + "/statistics";
     }
 
     // ============================================================================

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 9a1c1f5..55fa9e2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -9,17 +9,20 @@ import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
 import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.storage.StorageFactory;
 
 public class MRUtil {
 
-    public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+    public static IMRBatchCubingInputSide getBatchCubingInputSide(IRealizationSegment seg) {
         return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg);
     }
-
+    
     public static IMRTableInputFormat getTableInputFormat(String tableName) {
         return getTableInputFormat(getTableDesc(tableName));
     }
@@ -52,4 +55,8 @@ public class MRUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
     }
 
+    public static IMROutput.IMRBatchInvertedIndexingOutputSide getBatchInvertedIndexingOutputSide(IISegment seg) {
+        return StorageFactory.createEngineAdapter(seg, IMROutput.class).getBatchInvertedIndexingOutputSide(seg);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index ffa7c13..8782bbe 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -58,6 +58,8 @@ import org.apache.kylin.common.util.StringSplitter;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.job.exception.JobException;
@@ -322,6 +324,26 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         attachKylinPropsAndMetadata(dumpList, conf);
     }
 
+    protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
+        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+        // write II / model_desc / II_desc / dict / table
+        ArrayList<String> dumpList = new ArrayList<String>();
+        dumpList.add(ii.getResourcePath());
+        dumpList.add(ii.getDescriptor().getModel().getResourcePath());
+        dumpList.add(ii.getDescriptor().getResourcePath());
+
+        for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
+            TableDesc table = metaMgr.getTableDesc(tableName);
+            dumpList.add(table.getResourcePath());
+        }
+        for (IISegment segment : ii.getSegments()) {
+            dumpList.addAll(segment.getDictionaryPaths());
+        }
+
+        attachKylinPropsAndMetadata(dumpList, conf);
+    }
+
     protected void attachKylinPropsAndMetadata(ArrayList<String> dumpList, Configuration conf) throws IOException {
         File tmp = File.createTempFile("kylin_job_meta", "");
         FileUtils.forceDelete(tmp); // we need a directory, so delete the file first

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
new file mode 100644
index 0000000..97e27d0
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class BatchIIJobBuilder extends JobBuilderSupport {
+    
+    private static final Logger logger = LoggerFactory.getLogger(BatchIIJobBuilder.class);
+    
+    private final IMRBatchCubingInputSide inputSide;
+    private final IMROutput.IMRBatchInvertedIndexingOutputSide outputSide;
+
+    public BatchIIJobBuilder(IISegment newSegment, String submitter) {
+        super(newSegment, submitter);
+        this.inputSide = MRUtil.getBatchCubingInputSide(newSegment);
+        this.outputSide = MRUtil.getBatchInvertedIndexingOutputSide(newSegment);
+    }
+
+    public IIJob build() {
+        logger.info("MR new job to BUILD segment " + seg);
+
+        final IIJob result = IIJob.createBuildJob((IISegment)seg, submitter, config);
+        final String jobId = result.getId();
+        
+        final String iiRootPath = getRealizationRootPath(jobId) + "/";
+        // Phase 1: Create Flat Table
+        inputSide.addStepPhase1_CreateFlatTable(result);
+
+        final String intermediateTableIdentity = seg.getJoinedFlatTableDesc().getTableName();
+        // Phase 2: Build Dictionary
+        result.addTask(createIIFactDistinctColumnsStep(seg, intermediateTableIdentity, getFactDistinctColumnsPath(jobId)));
+        result.addTask(createIIBuildDictionaryStep(seg, getFactDistinctColumnsPath(jobId)));
+
+        // Phase 3: Build Cube
+        result.addTask(createInvertedIndexStep((IISegment)seg, intermediateTableIdentity, iiRootPath));
+        outputSide.addStepPhase3_BuildII(result, iiRootPath);
+
+        // Phase 4: Update Metadata & Cleanup
+        result.addTask(createUpdateInvertedIndexInfoAfterBuildStep(jobId));
+        inputSide.addStepPhase4_Cleanup(result);
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    private MapReduceExecutable createIIFactDistinctColumnsStep(IRealizationSegment seg, String factTableName, String output) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+        result.setMapReduceJobClass(IIDistinctColumnsJob.class);
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
+        appendExecCmdParameters(cmd, "tablename", factTableName);
+        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "output", output);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
+
+        result.setMapReduceParams(cmd.toString());
+        return result;
+    }
+
+    private HadoopShellExecutable createIIBuildDictionaryStep(IRealizationSegment seg, String factDistinctColumnsPath) {
+        // base cuboid job
+        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
+        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
+        StringBuilder cmd = new StringBuilder();
+        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
+
+        buildDictionaryStep.setJobParams(cmd.toString());
+        buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
+        return buildDictionaryStep;
+    }
+
+    private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
+        // base cuboid job
+        MapReduceExecutable buildIIStep = new MapReduceExecutable();
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
+
+        buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
+
+        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
+        appendExecCmdParameters(cmd, "output", iiOutputTempPath);
+        appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
+
+        buildIIStep.setMapReduceParams(cmd.toString());
+        buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
+        return buildIIStep;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
new file mode 100644
index 0000000..39d74b4
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
+import org.apache.kylin.engine.mr.DFSFileTable;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ */
+public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_II_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            parseOptions(options, args);
+
+            final String iiname = getOptionValue(OPTION_II_NAME);
+            final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
+            final KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+            IIManager mgr = IIManager.getInstance(config);
+            IIInstance ii = mgr.getII(iiname);
+
+            mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), new DistinctColumnValuesProvider() {
+                @Override
+                public ReadableTable getDistinctValuesFor(TblColRef col) {
+                    return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
+                }
+            });
+            return 0;
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args);
+        System.exit(exitCode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
new file mode 100644
index 0000000..651ad63
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinReducer;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> {
+
+    private Text outputValue = new Text();
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+    }
+
+    @Override
+    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+        HashSet<ByteArray> set = new HashSet<ByteArray>();
+        for (Text textValue : values) {
+            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+            set.add(value);
+        }
+
+        for (ByteArray value : set) {
+            outputValue.set(value.array(), value.offset(), value.length());
+            context.write(key, outputValue);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
new file mode 100644
index 0000000..fe968b1
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsJob extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(IIDistinctColumnsJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_TABLE_NAME);
+            options.addOption(OPTION_II_NAME);
+            options.addOption(OPTION_OUTPUT_PATH);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase();
+            String iiName = getOptionValue(OPTION_II_NAME);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            // ----------------------------------------------------------------------------
+
+            logger.info("Starting: " + job.getJobName() + " on table " + tableName);
+
+            IIManager iiMgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+            IIInstance ii = iiMgr.getII(iiName);
+            job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
+            job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii));
+
+            setJobClasspath(job);
+
+            
+            setupMapper(ii.getFirstSegment());
+            setupReducer(output);
+
+            Configuration conf = job.getConfiguration();
+            conf.set(BatchConstants.CFG_II_NAME, ii.getName());
+            attachKylinPropsAndMetadata(ii, conf);
+            return waitForCompletion(job);
+
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+
+    }
+
+    private String getColumns(IIInstance ii) {
+        IIJoinedFlatTableDesc iiflat = new IIJoinedFlatTableDesc(ii.getDescriptor());
+        StringBuilder buf = new StringBuilder();
+        for (IntermediateColumnDesc col : iiflat.getColumnList()) {
+            if (buf.length() > 0)
+                buf.append(",");
+            buf.append(col.getColumnName());
+        }
+        return buf.toString();
+    }
+
+    private void setupMapper(IISegment segment) throws IOException {
+        
+        IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
+        flatTableInputFormat.configureJob(job);
+
+        job.setMapperClass(IIDistinctColumnsMapper.class);
+        job.setCombinerClass(IIDistinctColumnsCombiner.class);
+        job.setMapOutputKeyClass(ShortWritable.class);
+        job.setMapOutputValueClass(Text.class);
+    }
+
+    private void setupReducer(Path output) throws IOException {
+        job.setReducerClass(IIDistinctColumnsReducer.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(Text.class);
+
+        FileOutputFormat.setOutputPath(job, output);
+        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+        job.setNumReduceTasks(1);
+
+        deletePath(job.getConfiguration(), output);
+    }
+
+    public static void main(String[] args) throws Exception {
+        IIDistinctColumnsJob job = new IIDistinctColumnsJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
new file mode 100644
index 0000000..c431ecd
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, Object, ShortWritable, Text> {
+
+    private ShortWritable outputKey = new ShortWritable();
+    private Text outputValue = new Text();
+
+    protected IMRInput.IMRTableInputFormat flatTableInputFormat;
+    
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        Configuration conf = context.getConfiguration();
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        String iiName = conf.get(BatchConstants.CFG_II_NAME);
+        IIInstance iiInstance = IIManager.getInstance(config).getII(iiName);
+        IISegment seg = iiInstance.getFirstSegment();
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(seg).getFlatTableInputFormat();
+    }
+
+    @Override
+    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+
+        String[] row = flatTableInputFormat.parseMapperInput(record);
+        
+        for (short i = 0; i < row.length; i++) {
+            outputKey.set(i);
+            if (row[i] == null)
+                continue;
+            byte[] bytes = Bytes.toBytes(row[i].toString());
+            outputValue.set(bytes, 0, bytes.length);
+            context.write(outputKey, outputValue);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
new file mode 100644
index 0000000..d50385f
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
+
+    private String[] columns;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        Configuration conf = context.getConfiguration();
+        this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
+    }
+
+    @Override
+    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+        String columnName = columns[key.get()];
+
+        HashSet<ByteArray> set = new HashSet<ByteArray>();
+        for (Text textValue : values) {
+            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+            set.add(value);
+        }
+
+        Configuration conf = context.getConfiguration();
+        FileSystem fs = FileSystem.get(conf);
+        String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+        FSDataOutputStream out = fs.create(new Path(outputPath, columnName));
+
+        try {
+            for (ByteArray value : set) {
+                out.write(value.array(), value.offset(), value.length());
+                out.write('\n');
+            }
+        } finally {
+            out.close();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java
new file mode 100644
index 0000000..86fedf0
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.invertedindex;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ */
+public class IIJob extends DefaultChainedExecutable {
+
+    public IIJob() {
+        super();
+    }
+
+    private static final String II_INSTANCE_NAME = "iiName";
+    private static final String SEGMENT_ID = "segmentId";
+
+    void setIIName(String name) {
+        setParam(II_INSTANCE_NAME, name);
+    }
+
+    public String getIIName() {
+        return getParam(II_INSTANCE_NAME);
+    }
+
+    void setSegmentId(String segmentId) {
+        setParam(SEGMENT_ID, segmentId);
+    }
+
+    public String getSegmentId() {
+        return getParam(SEGMENT_ID);
+    }
+
+
+    public static IIJob createBuildJob(IISegment seg, String submitter, JobEngineConfig config) {
+        return initialJob(seg, "BUILD", submitter, config);
+    }
+
+    private static IIJob initialJob(IISegment seg, String type, String submitter, JobEngineConfig config) {
+        IIJob result = new IIJob();
+        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+        format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
+        result.setIIName(seg.getIIInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
+        result.setSubmitter(submitter);
+        return result;
+    }
+
+}