You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/20 03:52:21 UTC

[1/5] incubator-kylin git commit: KYLIN-877 Add IMRInput javadoc

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-878 [created] 9a82f39bd


KYLIN-877 Add IMRInput javadoc


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2e89ea51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2e89ea51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2e89ea51

Branch: refs/heads/KYLIN-878
Commit: 2e89ea51dde565e9b308d6cbe5efae857a77e90d
Parents: ad7af7c
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Jul 16 11:27:13 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Jul 17 10:29:04 2015 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/BatchCubingJobBuilder.java  |  2 ++
 .../org/apache/kylin/engine/mr/IMRInput.java    | 22 ++++++++++++++++++++
 2 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e89ea51/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index c1524e7..239ce64 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -49,9 +49,11 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         // Phase 1: Create Flat Table
         inputSide.addStepPhase1_CreateFlatTable(result);
         
+        // Phase 2: Build Dictionary
         result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
         result.addTask(createBuildDictionaryStep(jobId));
         
+        // Phase 3: Build Cube
         if (config.isInMemCubing()) {
             result.addTask(createSaveStatisticsStep(jobId));
             

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e89ea51/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index c170b47..08ed94a 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -23,25 +23,47 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.TableDesc;
 
+/**
+ * Any ITableSource that wishes to serve as input of MapReduce build engine must adapt to this interface.
+ */
 public interface IMRInput {
 
+    /** Return a helper to participate in batch cubing job flow. */
     public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
 
+    /** Return an InputFormat that reads from specified table. */
     public IMRTableInputFormat getTableInputFormat(TableDesc table);
     
+    /**
+     * Utility that configures mapper to read from a table.
+     */
     public interface IMRTableInputFormat {
         
+        /** Configure the InputFormat of given job. */
         public void configureJob(Job job);
         
+        /** Parse a mapper input object into column values. */
         public String[] parseMapperInput(Object mapperInput);
     }
     
+    /**
+     * Participate the batch cubing flow as the input side. Responsible for creating
+     * intermediate flat table (Phase 1) and clean up if necessary (Phase 4).
+     * 
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary
+     * - Phase 3: Build Cube
+     * - Phase 4: Update Metadata & Cleanup
+     */
     public interface IMRBatchCubingInputSide {
         
+        /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
         
+        /** Add step that does necessary clean up, like delete the intermediate flat table */
         public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow);
         
+        /** Return an InputFormat that reads from the intermediate flat table */
         public IMRTableInputFormat getFlatTableInputFormat();
     }
 }


[2/5] incubator-kylin git commit: KYLIN-878 HBase storage abstraction for cubing flow

Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
deleted file mode 100644
index fdca42b..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ /dev/null
@@ -1,663 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowValueDecoder;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseMappingDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
-import org.apache.kylin.storage.tuple.TupleInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * @author xjiang, yangli9
- */
-public class CubeStorageEngine implements ICachableStorageEngine {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeStorageEngine.class);
-
-    private static final int MERGE_KEYRANGE_THRESHOLD = 100;
-    private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
-
-    private final CubeInstance cubeInstance;
-    private final CubeDesc cubeDesc;
-    private final String uuid;
-
-    public CubeStorageEngine(CubeInstance cube) {
-        this.cubeInstance = cube;
-        this.cubeDesc = cube.getDescriptor();
-        this.uuid = cube.getUuid();
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-
-        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
-        TupleFilter filter = sqlDigest.filter;
-
-        // build dimension & metrics
-        Collection<TblColRef> dimensions = new HashSet<TblColRef>();
-        Collection<FunctionDesc> metrics = new HashSet<FunctionDesc>();
-        buildDimensionsAndMetrics(dimensions, metrics, sqlDigest);
-
-        // all dimensions = groups + others
-        Set<TblColRef> others = Sets.newHashSet(dimensions);
-        others.removeAll(groups);
-
-        // expand derived
-        Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
-        Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
-        Set<TblColRef> othersD = expandDerived(others, derivedPostAggregation);
-        othersD.removeAll(groupsD);
-        derivedPostAggregation.removeAll(groups);
-
-        // identify cuboid
-        Set<TblColRef> dimensionsD = Sets.newHashSet();
-        dimensionsD.addAll(groupsD);
-        dimensionsD.addAll(othersD);
-        Cuboid cuboid = identifyCuboid(dimensionsD);
-        context.setCuboid(cuboid);
-
-        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
-        Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
-        boolean isExactAggregation = isExactAggregation(cuboid, groups, othersD, singleValuesD, derivedPostAggregation);
-        context.setExactAggregation(isExactAggregation);
-
-        // translate filter for scan range and compose returning groups for coprocessor, note:
-        // - columns on non-evaluatable filter have to return
-        // - columns on loosened filter (due to derived translation) have to return
-        Set<TblColRef> groupsCopD = Sets.newHashSet(groupsD);
-        collectNonEvaluable(filter, groupsCopD);
-        TupleFilter filterD = translateDerived(filter, groupsCopD);
-
-        // flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
-        TupleFilter flatFilter = flattenToOrAndFilter(filterD);
-
-        // translate filter into segment scan ranges
-        List<HBaseKeyRange> scans = buildScanRanges(flatFilter, dimensionsD);
-
-        // check involved measures, build value decoder for each each family:column
-        List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
-
-        //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
-        //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
-        setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
-        setLimit(filter, context);
-
-        HConnection conn = HBaseConnection.get(context.getConnUrl());
-        return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
-    }
-
-    @Override
-    public Range<Long> getVolatilePeriod() {
-        return null;
-    }
-
-    @Override
-    public String getStorageUUID() {
-        return this.uuid;
-    }
-
-    @Override
-    public boolean isDynamic() {
-        return false;
-    }
-
-    private void buildDimensionsAndMetrics(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, SQLDigest sqlDigest) {
-
-        for (FunctionDesc func : sqlDigest.aggregations) {
-            if (!func.isDimensionAsMetric()) {
-                metrics.add(func);
-            }
-        }
-
-        for (TblColRef column : sqlDigest.allColumns) {
-            // skip measure columns
-            if (sqlDigest.metricColumns.contains(column)) {
-                continue;
-            }
-            dimensions.add(column);
-        }
-    }
-
-    private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
-        long cuboidID = 0;
-        for (TblColRef column : dimensions) {
-            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
-            cuboidID |= 1L << index;
-        }
-        return Cuboid.findById(cubeDesc, cuboidID);
-    }
-
-    private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
-        boolean exact = true;
-
-        if (cuboid.requirePostAggregation()) {
-            exact = false;
-            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
-        }
-
-        // derived aggregation is bad, unless expanded columns are already in group by
-        if (groups.containsAll(derivedPostAggregation) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
-        }
-
-        // other columns (from filter) is bad, unless they are ensured to have single value
-        if (singleValuesD.containsAll(othersD) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because some column not on group by: " + othersD //
-                    + " (single value column: " + singleValuesD + ")");
-        }
-
-        if (exact) {
-            logger.info("exactAggregation is true");
-        }
-        return exact;
-    }
-
-    private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
-        Set<TblColRef> expanded = Sets.newHashSet();
-        for (TblColRef col : cols) {
-            if (cubeDesc.isDerived(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                for (TblColRef hostCol : hostInfo.columns) {
-                    expanded.add(hostCol);
-                    if (hostInfo.isOneToOne == false)
-                        derivedPostAggregation.add(hostCol);
-                }
-            } else {
-                expanded.add(col);
-            }
-        }
-        return expanded;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
-        Collection<? extends TupleFilter> toCheck;
-        if (filter instanceof CompareTupleFilter) {
-            toCheck = Collections.singleton(filter);
-        } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
-            toCheck = filter.getChildren();
-        } else {
-            return (Set<TblColRef>) Collections.EMPTY_SET;
-        }
-
-        Set<TblColRef> result = Sets.newHashSet();
-        for (TupleFilter f : toCheck) {
-            if (f instanceof CompareTupleFilter) {
-                CompareTupleFilter compFilter = (CompareTupleFilter) f;
-                // is COL=const ?
-                if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
-                    result.add(compFilter.getColumn());
-                }
-            }
-        }
-
-        // expand derived
-        Set<TblColRef> resultD = Sets.newHashSet();
-        for (TblColRef col : result) {
-            if (cubeDesc.isDerived(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                if (hostInfo.isOneToOne) {
-                    for (TblColRef hostCol : hostInfo.columns) {
-                        resultD.add(hostCol);
-                    }
-                }
-                //if not one2one, it will be pruned
-            } else {
-                resultD.add(col);
-            }
-        }
-        return resultD;
-    }
-
-    private void collectNonEvaluable(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter == null)
-            return;
-
-        if (filter.isEvaluable()) {
-            for (TupleFilter child : filter.getChildren())
-                collectNonEvaluable(child, collector);
-        } else {
-            collectColumnsRecursively(filter, collector);
-        }
-    }
-
-    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter instanceof ColumnTupleFilter) {
-            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
-        }
-        for (TupleFilter child : filter.getChildren()) {
-            collectColumnsRecursively(child, collector);
-        }
-    }
-
-    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
-        if (cubeDesc.isDerived(col)) {
-            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-            for (TblColRef h : hostInfo.columns)
-                collector.add(h);
-        } else {
-            collector.add(col);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter == null)
-            return filter;
-
-        if (filter instanceof CompareTupleFilter) {
-            return translateDerivedInCompare((CompareTupleFilter) filter, collector);
-        }
-
-        List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
-        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
-        boolean modified = false;
-        for (TupleFilter child : children) {
-            TupleFilter translated = translateDerived(child, collector);
-            newChildren.add(translated);
-            if (child != translated)
-                modified = true;
-        }
-        if (modified) {
-            filter = replaceChildren(filter, newChildren);
-        }
-        return filter;
-    }
-
-    private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
-        if (filter instanceof LogicalTupleFilter) {
-            LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
-            r.addChildren(newChildren);
-            return r;
-        } else
-            throw new IllegalStateException("Cannot replaceChildren on " + filter);
-    }
-
-    private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
-        if (compf.getColumn() == null || compf.getValues().isEmpty())
-            return compf;
-
-        TblColRef derived = compf.getColumn();
-        if (cubeDesc.isDerived(derived) == false)
-            return compf;
-
-        DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
-        CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
-        CubeSegment seg = cubeInstance.getLatestReadySegment();
-        LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
-        Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
-        TupleFilter translatedFilter = translated.getFirst();
-        boolean loosened = translated.getSecond();
-        if (loosened) {
-            collectColumnsRecursively(translatedFilter, collector);
-        }
-        return translatedFilter;
-    }
-
-    private List<RowValueDecoder> translateAggregation(HBaseMappingDesc hbaseMapping, Collection<FunctionDesc> metrics, //
-            StorageContext context) {
-        Map<HBaseColumnDesc, RowValueDecoder> codecMap = Maps.newHashMap();
-        for (FunctionDesc aggrFunc : metrics) {
-            Collection<HBaseColumnDesc> hbCols = hbaseMapping.findHBaseColumnByFunction(aggrFunc);
-            if (hbCols.isEmpty()) {
-                throw new IllegalStateException("can't find HBaseColumnDesc for function " + aggrFunc.getFullExpression());
-            }
-            HBaseColumnDesc bestHBCol = null;
-            int bestIndex = -1;
-            for (HBaseColumnDesc hbCol : hbCols) {
-                bestHBCol = hbCol;
-                bestIndex = hbCol.findMeasureIndex(aggrFunc);
-                MeasureDesc measure = hbCol.getMeasures()[bestIndex];
-                // criteria for holistic measure: Exact Aggregation && Exact Cuboid
-                if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) {
-                    logger.info("Holistic count distinct chosen for " + aggrFunc);
-                    break;
-                }
-            }
-
-            RowValueDecoder codec = codecMap.get(bestHBCol);
-            if (codec == null) {
-                codec = new RowValueDecoder(bestHBCol);
-                codecMap.put(bestHBCol, codec);
-            }
-            codec.setIndex(bestIndex);
-        }
-        return new ArrayList<RowValueDecoder>(codecMap.values());
-    }
-
-    private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
-        if (filter == null)
-            return null;
-
-        TupleFilter flatFilter = filter.flatFilter();
-
-        // normalize to OR-AND filter
-        if (flatFilter.getOperator() == FilterOperatorEnum.AND) {
-            LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR);
-            f.addChild(flatFilter);
-            flatFilter = f;
-        }
-
-        if (flatFilter.getOperator() != FilterOperatorEnum.OR)
-            throw new IllegalStateException();
-
-        return flatFilter;
-    }
-
-    private List<HBaseKeyRange> buildScanRanges(TupleFilter flatFilter, Collection<TblColRef> dimensionColumns) {
-
-        List<HBaseKeyRange> result = Lists.newArrayList();
-
-        logger.info("Current cubeInstance is " + cubeInstance + " with " + cubeInstance.getSegments().size() + " segs in all");
-        List<CubeSegment> segs = cubeInstance.getSegments(SegmentStatusEnum.READY);
-        logger.info("READY segs count: " + segs.size());
-
-        // build row key range for each cube segment
-        for (CubeSegment cubeSeg : segs) {
-
-            // consider derived (lookup snapshot), filter on dimension may
-            // differ per segment
-            List<Collection<ColumnValueRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter, cubeSeg);
-            if (orAndDimRanges == null) { // has conflict
-                continue;
-            }
-
-            List<HBaseKeyRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
-            for (Collection<ColumnValueRange> andDimRanges : orAndDimRanges) {
-                HBaseKeyRange rowKeyRange = new HBaseKeyRange(dimensionColumns, andDimRanges, cubeSeg, cubeDesc);
-                scanRanges.add(rowKeyRange);
-            }
-
-            List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
-            mergedRanges = mergeTooManyRanges(mergedRanges);
-            result.addAll(mergedRanges);
-        }
-
-        logger.info("hbasekeyrange count: " + result.size());
-        dropUnhitSegments(result);
-        logger.info("hbasekeyrange count after dropping unhit :" + result.size());
-
-        return result;
-    }
-
-    private List<Collection<ColumnValueRange>> translateToOrAndDimRanges(TupleFilter flatFilter, CubeSegment cubeSegment) {
-        List<Collection<ColumnValueRange>> result = Lists.newArrayList();
-
-        if (flatFilter == null) {
-            result.add(Collections.<ColumnValueRange> emptyList());
-            return result;
-        }
-
-        for (TupleFilter andFilter : flatFilter.getChildren()) {
-            if (andFilter.getOperator() != FilterOperatorEnum.AND) {
-                throw new IllegalStateException("Filter should be AND instead of " + andFilter);
-            }
-
-            Collection<ColumnValueRange> andRanges = translateToAndDimRanges(andFilter.getChildren(), cubeSegment);
-
-            result.add(andRanges);
-        }
-
-        return preprocessConstantConditions(result);
-    }
-
-    private List<Collection<ColumnValueRange>> preprocessConstantConditions(List<Collection<ColumnValueRange>> orAndRanges) {
-        boolean globalAlwaysTrue = false;
-        Iterator<Collection<ColumnValueRange>> iterator = orAndRanges.iterator();
-        while (iterator.hasNext()) {
-            Collection<ColumnValueRange> andRanges = iterator.next();
-            Iterator<ColumnValueRange> iterator2 = andRanges.iterator();
-            boolean hasAlwaysFalse = false;
-            while (iterator2.hasNext()) {
-                ColumnValueRange range = iterator2.next();
-                if (range.satisfyAll())
-                    iterator2.remove();
-                else if (range.satisfyNone())
-                    hasAlwaysFalse = true;
-            }
-            if (hasAlwaysFalse) {
-                iterator.remove();
-            } else if (andRanges.isEmpty()) {
-                globalAlwaysTrue = true;
-                break;
-            }
-        }
-        if (globalAlwaysTrue) {
-            orAndRanges.clear();
-            orAndRanges.add(Collections.<ColumnValueRange> emptyList());
-        }
-        return orAndRanges;
-    }
-
-    // return empty collection to mean true; return null to mean false
-    @SuppressWarnings("unchecked")
-    private Collection<ColumnValueRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters, CubeSegment cubeSegment) {
-        Map<TblColRef, ColumnValueRange> rangeMap = new HashMap<TblColRef, ColumnValueRange>();
-        for (TupleFilter filter : andFilters) {
-            if ((filter instanceof CompareTupleFilter) == false) {
-                continue;
-            }
-
-            CompareTupleFilter comp = (CompareTupleFilter) filter;
-            if (comp.getColumn() == null) {
-                continue;
-            }
-
-            ColumnValueRange range = new ColumnValueRange(comp.getColumn(), (Collection<String>) comp.getValues(), comp.getOperator());
-            andMerge(range, rangeMap);
-        }
-
-        // a little pre-evaluation to remove invalid EQ/IN values and round start/end according to dictionary
-        Iterator<ColumnValueRange> it = rangeMap.values().iterator();
-        while (it.hasNext()) {
-            ColumnValueRange range = it.next();
-            range.preEvaluateWithDict((Dictionary<String>) cubeSegment.getDictionary(range.getColumn()));
-            if (range.satisfyAll())
-                it.remove();
-            else if (range.satisfyNone())
-                return null;
-        }
-
-        return rangeMap.values();
-    }
-
-    private void andMerge(ColumnValueRange range, Map<TblColRef, ColumnValueRange> rangeMap) {
-        ColumnValueRange columnRange = rangeMap.get(range.getColumn());
-        if (columnRange == null) {
-            rangeMap.put(range.getColumn(), range);
-        } else {
-            columnRange.andMerge(range);
-        }
-    }
-
-    private List<HBaseKeyRange> mergeOverlapRanges(List<HBaseKeyRange> keyRanges) {
-        if (keyRanges.size() <= 1) {
-            return keyRanges;
-        }
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Merging key range from " + keyRanges.size());
-        }
-
-        // sort ranges by start key
-        Collections.sort(keyRanges);
-
-        // merge the overlap range
-        List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
-        int beginIndex = 0;
-        byte[] maxStopKey = keyRanges.get(0).getStopKey();
-        for (int index = 0; index < keyRanges.size(); index++) {
-            HBaseKeyRange keyRange = keyRanges.get(index);
-            if (Bytes.compareTo(maxStopKey, keyRange.getStartKey()) < 0) {
-                // merge the current key ranges
-                HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, index - 1);
-                mergedRanges.add(mergedRange);
-                // start new merge
-                beginIndex = index;
-            }
-            if (Bytes.compareTo(maxStopKey, keyRange.getStopKey()) < 0) {
-                // update the stop key
-                maxStopKey = keyRange.getStopKey();
-            }
-        }
-        // merge last range
-        HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, keyRanges.size() - 1);
-        mergedRanges.add(mergedRange);
-        if (logger.isDebugEnabled()) {
-            logger.debug("Merging key range to " + mergedRanges.size());
-        }
-        return mergedRanges;
-    }
-
-    private HBaseKeyRange mergeKeyRange(List<HBaseKeyRange> keyRanges, int from, int to) {
-        HBaseKeyRange keyRange = keyRanges.get(from);
-        int mergeSize = to - from + 1;
-        if (mergeSize > 1) {
-            // merge range from mergeHeader to i - 1
-            CubeSegment cubeSegment = keyRange.getCubeSegment();
-            Cuboid cuboid = keyRange.getCuboid();
-            byte[] startKey = keyRange.getStartKey();
-            byte[] stopKey = keyRange.getStopKey();
-            long partitionColumnStartDate = Long.MAX_VALUE;
-            long partitionColumnEndDate = 0;
-            List<Pair<byte[], byte[]>> newFuzzyKeys = new ArrayList<Pair<byte[], byte[]>>(mergeSize);
-            List<Collection<ColumnValueRange>> newFlatOrAndFilter = Lists.newLinkedList();
-
-            boolean hasNonFuzzyRange = false;
-            for (int k = from; k <= to; k++) {
-                HBaseKeyRange nextRange = keyRanges.get(k);
-                hasNonFuzzyRange = hasNonFuzzyRange || nextRange.getFuzzyKeys().isEmpty();
-                newFuzzyKeys.addAll(nextRange.getFuzzyKeys());
-                newFlatOrAndFilter.addAll(nextRange.getFlatOrAndFilter());
-                if (Bytes.compareTo(stopKey, nextRange.getStopKey()) < 0) {
-                    stopKey = nextRange.getStopKey();
-                }
-                if (nextRange.getPartitionColumnStartDate() > 0 && nextRange.getPartitionColumnStartDate() < partitionColumnStartDate) {
-                    partitionColumnStartDate = nextRange.getPartitionColumnStartDate();
-                }
-                if (nextRange.getPartitionColumnEndDate() < Long.MAX_VALUE && nextRange.getPartitionColumnEndDate() > partitionColumnEndDate) {
-                    partitionColumnEndDate = nextRange.getPartitionColumnEndDate();
-                }
-            }
-
-            // if any range is non-fuzzy, then all fuzzy keys must be cleared
-            if (hasNonFuzzyRange) {
-                newFuzzyKeys.clear();
-            }
-
-            partitionColumnStartDate = (partitionColumnStartDate == Long.MAX_VALUE) ? 0 : partitionColumnStartDate;
-            partitionColumnEndDate = (partitionColumnEndDate == 0) ? Long.MAX_VALUE : partitionColumnEndDate;
-            keyRange = new HBaseKeyRange(cubeSegment, cuboid, startKey, stopKey, newFuzzyKeys, newFlatOrAndFilter, partitionColumnStartDate, partitionColumnEndDate);
-        }
-        return keyRange;
-    }
-
-    private List<HBaseKeyRange> mergeTooManyRanges(List<HBaseKeyRange> keyRanges) {
-        if (keyRanges.size() < MERGE_KEYRANGE_THRESHOLD) {
-            return keyRanges;
-        }
-        // TODO: check the distance between range. and merge the large distance range
-        List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
-        HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, 0, keyRanges.size() - 1);
-        mergedRanges.add(mergedRange);
-        return mergedRanges;
-    }
-
-    private void dropUnhitSegments(List<HBaseKeyRange> scans) {
-        if (cubeDesc.getModel().getPartitionDesc().isPartitioned()) {
-            Iterator<HBaseKeyRange> iterator = scans.iterator();
-            while (iterator.hasNext()) {
-                HBaseKeyRange scan = iterator.next();
-                if (scan.hitSegment() == false) {
-                    iterator.remove();
-                }
-            }
-        }
-    }
-
-    private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
-        if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
-            return;
-        }
-
-        int rowSizeEst = dimensions.size() * 3;
-        for (RowValueDecoder decoder : valueDecoders) {
-            MeasureDesc[] measures = decoder.getMeasures();
-            BitSet projectionIndex = decoder.getProjectionIndex();
-            for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
-                FunctionDesc func = measures[i].getFunction();
-                rowSizeEst += func.getReturnDataType().getSpaceEstimate();
-            }
-        }
-
-        long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
-        context.setThreshold((int) rowEst);
-    }
-
-    private void setLimit(TupleFilter filter, StorageContext context) {
-        boolean goodAggr = context.isExactAggregation();
-        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
-        boolean goodSort = context.hasSort() == false;
-        if (goodAggr && goodFilter && goodSort) {
-            logger.info("Enable limit " + context.getLimit());
-            context.enableLimit();
-        }
-    }
-
-    private void setCoprocessor(Set<TblColRef> groupsCopD, List<RowValueDecoder> valueDecoders, StorageContext context) {
-        ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java
new file mode 100644
index 0000000..1d76bd4
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowValueDecoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseMappingDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * @author xjiang, yangli9
+ */
+public class CubeStorageQuery implements ICachableStorageQuery {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
+
+    private static final int MERGE_KEYRANGE_THRESHOLD = 100;
+    private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
+
+    private final CubeInstance cubeInstance;
+    private final CubeDesc cubeDesc;
+    private final String uuid;
+
+    public CubeStorageQuery(CubeInstance cube) {
+        this.cubeInstance = cube;
+        this.cubeDesc = cube.getDescriptor();
+        this.uuid = cube.getUuid();
+    }
+
+    @Override
+    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+
+        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+        TupleFilter filter = sqlDigest.filter;
+
+        // build dimension & metrics
+        Collection<TblColRef> dimensions = new HashSet<TblColRef>();
+        Collection<FunctionDesc> metrics = new HashSet<FunctionDesc>();
+        buildDimensionsAndMetrics(dimensions, metrics, sqlDigest);
+
+        // all dimensions = groups + others
+        Set<TblColRef> others = Sets.newHashSet(dimensions);
+        others.removeAll(groups);
+
+        // expand derived
+        Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
+        Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
+        Set<TblColRef> othersD = expandDerived(others, derivedPostAggregation);
+        othersD.removeAll(groupsD);
+        derivedPostAggregation.removeAll(groups);
+
+        // identify cuboid
+        Set<TblColRef> dimensionsD = Sets.newHashSet();
+        dimensionsD.addAll(groupsD);
+        dimensionsD.addAll(othersD);
+        Cuboid cuboid = identifyCuboid(dimensionsD);
+        context.setCuboid(cuboid);
+
+        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
+        Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
+        boolean isExactAggregation = isExactAggregation(cuboid, groups, othersD, singleValuesD, derivedPostAggregation);
+        context.setExactAggregation(isExactAggregation);
+
+        // translate filter for scan range and compose returning groups for coprocessor, note:
+        // - columns on non-evaluatable filter have to return
+        // - columns on loosened filter (due to derived translation) have to return
+        Set<TblColRef> groupsCopD = Sets.newHashSet(groupsD);
+        collectNonEvaluable(filter, groupsCopD);
+        TupleFilter filterD = translateDerived(filter, groupsCopD);
+
+        // flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
+        TupleFilter flatFilter = flattenToOrAndFilter(filterD);
+
+        // translate filter into segment scan ranges
+        List<HBaseKeyRange> scans = buildScanRanges(flatFilter, dimensionsD);
+
+        // check involved measures, build value decoder for each each family:column
+        List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
+
+        //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
+        //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
+        setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
+        setLimit(filter, context);
+
+        HConnection conn = HBaseConnection.get(context.getConnUrl());
+        return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
+    }
+
+    @Override
+    public Range<Long> getVolatilePeriod() {
+        return null;
+    }
+
+    @Override
+    public String getStorageUUID() {
+        return this.uuid;
+    }
+
+    @Override
+    public boolean isDynamic() {
+        return false;
+    }
+
+    private void buildDimensionsAndMetrics(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, SQLDigest sqlDigest) {
+
+        for (FunctionDesc func : sqlDigest.aggregations) {
+            if (!func.isDimensionAsMetric()) {
+                metrics.add(func);
+            }
+        }
+
+        for (TblColRef column : sqlDigest.allColumns) {
+            // skip measure columns
+            if (sqlDigest.metricColumns.contains(column)) {
+                continue;
+            }
+            dimensions.add(column);
+        }
+    }
+
+    private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
+        long cuboidID = 0;
+        for (TblColRef column : dimensions) {
+            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
+            cuboidID |= 1L << index;
+        }
+        return Cuboid.findById(cubeDesc, cuboidID);
+    }
+
+    private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
+        boolean exact = true;
+
+        if (cuboid.requirePostAggregation()) {
+            exact = false;
+            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+        }
+
+        // derived aggregation is bad, unless expanded columns are already in group by
+        if (groups.containsAll(derivedPostAggregation) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
+        }
+
+        // other columns (from filter) is bad, unless they are ensured to have single value
+        if (singleValuesD.containsAll(othersD) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because some column not on group by: " + othersD //
+                    + " (single value column: " + singleValuesD + ")");
+        }
+
+        if (exact) {
+            logger.info("exactAggregation is true");
+        }
+        return exact;
+    }
+
+    private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
+        Set<TblColRef> expanded = Sets.newHashSet();
+        for (TblColRef col : cols) {
+            if (cubeDesc.isDerived(col)) {
+                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+                for (TblColRef hostCol : hostInfo.columns) {
+                    expanded.add(hostCol);
+                    if (hostInfo.isOneToOne == false)
+                        derivedPostAggregation.add(hostCol);
+                }
+            } else {
+                expanded.add(col);
+            }
+        }
+        return expanded;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
+        Collection<? extends TupleFilter> toCheck;
+        if (filter instanceof CompareTupleFilter) {
+            toCheck = Collections.singleton(filter);
+        } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
+            toCheck = filter.getChildren();
+        } else {
+            return (Set<TblColRef>) Collections.EMPTY_SET;
+        }
+
+        Set<TblColRef> result = Sets.newHashSet();
+        for (TupleFilter f : toCheck) {
+            if (f instanceof CompareTupleFilter) {
+                CompareTupleFilter compFilter = (CompareTupleFilter) f;
+                // is COL=const ?
+                if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
+                    result.add(compFilter.getColumn());
+                }
+            }
+        }
+
+        // expand derived
+        Set<TblColRef> resultD = Sets.newHashSet();
+        for (TblColRef col : result) {
+            if (cubeDesc.isDerived(col)) {
+                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+                if (hostInfo.isOneToOne) {
+                    for (TblColRef hostCol : hostInfo.columns) {
+                        resultD.add(hostCol);
+                    }
+                }
+                //if not one2one, it will be pruned
+            } else {
+                resultD.add(col);
+            }
+        }
+        return resultD;
+    }
+
+    private void collectNonEvaluable(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter == null)
+            return;
+
+        if (filter.isEvaluable()) {
+            for (TupleFilter child : filter.getChildren())
+                collectNonEvaluable(child, collector);
+        } else {
+            collectColumnsRecursively(filter, collector);
+        }
+    }
+
+    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter instanceof ColumnTupleFilter) {
+            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
+        }
+        for (TupleFilter child : filter.getChildren()) {
+            collectColumnsRecursively(child, collector);
+        }
+    }
+
+    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
+        if (cubeDesc.isDerived(col)) {
+            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+            for (TblColRef h : hostInfo.columns)
+                collector.add(h);
+        } else {
+            collector.add(col);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter == null)
+            return filter;
+
+        if (filter instanceof CompareTupleFilter) {
+            return translateDerivedInCompare((CompareTupleFilter) filter, collector);
+        }
+
+        List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
+        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
+        boolean modified = false;
+        for (TupleFilter child : children) {
+            TupleFilter translated = translateDerived(child, collector);
+            newChildren.add(translated);
+            if (child != translated)
+                modified = true;
+        }
+        if (modified) {
+            filter = replaceChildren(filter, newChildren);
+        }
+        return filter;
+    }
+
+    private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
+        if (filter instanceof LogicalTupleFilter) {
+            LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
+            r.addChildren(newChildren);
+            return r;
+        } else
+            throw new IllegalStateException("Cannot replaceChildren on " + filter);
+    }
+
+    private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
+        if (compf.getColumn() == null || compf.getValues().isEmpty())
+            return compf;
+
+        TblColRef derived = compf.getColumn();
+        if (cubeDesc.isDerived(derived) == false)
+            return compf;
+
+        DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
+        CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
+        CubeSegment seg = cubeInstance.getLatestReadySegment();
+        LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
+        Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+        TupleFilter translatedFilter = translated.getFirst();
+        boolean loosened = translated.getSecond();
+        if (loosened) {
+            collectColumnsRecursively(translatedFilter, collector);
+        }
+        return translatedFilter;
+    }
+
+    private List<RowValueDecoder> translateAggregation(HBaseMappingDesc hbaseMapping, Collection<FunctionDesc> metrics, //
+            StorageContext context) {
+        Map<HBaseColumnDesc, RowValueDecoder> codecMap = Maps.newHashMap();
+        for (FunctionDesc aggrFunc : metrics) {
+            Collection<HBaseColumnDesc> hbCols = hbaseMapping.findHBaseColumnByFunction(aggrFunc);
+            if (hbCols.isEmpty()) {
+                throw new IllegalStateException("can't find HBaseColumnDesc for function " + aggrFunc.getFullExpression());
+            }
+            HBaseColumnDesc bestHBCol = null;
+            int bestIndex = -1;
+            for (HBaseColumnDesc hbCol : hbCols) {
+                bestHBCol = hbCol;
+                bestIndex = hbCol.findMeasureIndex(aggrFunc);
+                MeasureDesc measure = hbCol.getMeasures()[bestIndex];
+                // criteria for holistic measure: Exact Aggregation && Exact Cuboid
+                if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) {
+                    logger.info("Holistic count distinct chosen for " + aggrFunc);
+                    break;
+                }
+            }
+
+            RowValueDecoder codec = codecMap.get(bestHBCol);
+            if (codec == null) {
+                codec = new RowValueDecoder(bestHBCol);
+                codecMap.put(bestHBCol, codec);
+            }
+            codec.setIndex(bestIndex);
+        }
+        return new ArrayList<RowValueDecoder>(codecMap.values());
+    }
+
+    private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
+        if (filter == null)
+            return null;
+
+        TupleFilter flatFilter = filter.flatFilter();
+
+        // normalize to OR-AND filter
+        if (flatFilter.getOperator() == FilterOperatorEnum.AND) {
+            LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR);
+            f.addChild(flatFilter);
+            flatFilter = f;
+        }
+
+        if (flatFilter.getOperator() != FilterOperatorEnum.OR)
+            throw new IllegalStateException();
+
+        return flatFilter;
+    }
+
+    private List<HBaseKeyRange> buildScanRanges(TupleFilter flatFilter, Collection<TblColRef> dimensionColumns) {
+
+        List<HBaseKeyRange> result = Lists.newArrayList();
+
+        logger.info("Current cubeInstance is " + cubeInstance + " with " + cubeInstance.getSegments().size() + " segs in all");
+        List<CubeSegment> segs = cubeInstance.getSegments(SegmentStatusEnum.READY);
+        logger.info("READY segs count: " + segs.size());
+
+        // build row key range for each cube segment
+        for (CubeSegment cubeSeg : segs) {
+
+            // consider derived (lookup snapshot), filter on dimension may
+            // differ per segment
+            List<Collection<ColumnValueRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter, cubeSeg);
+            if (orAndDimRanges == null) { // has conflict
+                continue;
+            }
+
+            List<HBaseKeyRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
+            for (Collection<ColumnValueRange> andDimRanges : orAndDimRanges) {
+                HBaseKeyRange rowKeyRange = new HBaseKeyRange(dimensionColumns, andDimRanges, cubeSeg, cubeDesc);
+                scanRanges.add(rowKeyRange);
+            }
+
+            List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
+            mergedRanges = mergeTooManyRanges(mergedRanges);
+            result.addAll(mergedRanges);
+        }
+
+        logger.info("hbasekeyrange count: " + result.size());
+        dropUnhitSegments(result);
+        logger.info("hbasekeyrange count after dropping unhit :" + result.size());
+
+        return result;
+    }
+
+    private List<Collection<ColumnValueRange>> translateToOrAndDimRanges(TupleFilter flatFilter, CubeSegment cubeSegment) {
+        List<Collection<ColumnValueRange>> result = Lists.newArrayList();
+
+        if (flatFilter == null) {
+            result.add(Collections.<ColumnValueRange> emptyList());
+            return result;
+        }
+
+        for (TupleFilter andFilter : flatFilter.getChildren()) {
+            if (andFilter.getOperator() != FilterOperatorEnum.AND) {
+                throw new IllegalStateException("Filter should be AND instead of " + andFilter);
+            }
+
+            Collection<ColumnValueRange> andRanges = translateToAndDimRanges(andFilter.getChildren(), cubeSegment);
+
+            result.add(andRanges);
+        }
+
+        return preprocessConstantConditions(result);
+    }
+
+    private List<Collection<ColumnValueRange>> preprocessConstantConditions(List<Collection<ColumnValueRange>> orAndRanges) {
+        boolean globalAlwaysTrue = false;
+        Iterator<Collection<ColumnValueRange>> iterator = orAndRanges.iterator();
+        while (iterator.hasNext()) {
+            Collection<ColumnValueRange> andRanges = iterator.next();
+            Iterator<ColumnValueRange> iterator2 = andRanges.iterator();
+            boolean hasAlwaysFalse = false;
+            while (iterator2.hasNext()) {
+                ColumnValueRange range = iterator2.next();
+                if (range.satisfyAll())
+                    iterator2.remove();
+                else if (range.satisfyNone())
+                    hasAlwaysFalse = true;
+            }
+            if (hasAlwaysFalse) {
+                iterator.remove();
+            } else if (andRanges.isEmpty()) {
+                globalAlwaysTrue = true;
+                break;
+            }
+        }
+        if (globalAlwaysTrue) {
+            orAndRanges.clear();
+            orAndRanges.add(Collections.<ColumnValueRange> emptyList());
+        }
+        return orAndRanges;
+    }
+
+    // return empty collection to mean true; return null to mean false
+    @SuppressWarnings("unchecked")
+    private Collection<ColumnValueRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters, CubeSegment cubeSegment) {
+        Map<TblColRef, ColumnValueRange> rangeMap = new HashMap<TblColRef, ColumnValueRange>();
+        for (TupleFilter filter : andFilters) {
+            if ((filter instanceof CompareTupleFilter) == false) {
+                continue;
+            }
+
+            CompareTupleFilter comp = (CompareTupleFilter) filter;
+            if (comp.getColumn() == null) {
+                continue;
+            }
+
+            ColumnValueRange range = new ColumnValueRange(comp.getColumn(), (Collection<String>) comp.getValues(), comp.getOperator());
+            andMerge(range, rangeMap);
+        }
+
+        // a little pre-evaluation to remove invalid EQ/IN values and round start/end according to dictionary
+        Iterator<ColumnValueRange> it = rangeMap.values().iterator();
+        while (it.hasNext()) {
+            ColumnValueRange range = it.next();
+            range.preEvaluateWithDict((Dictionary<String>) cubeSegment.getDictionary(range.getColumn()));
+            if (range.satisfyAll())
+                it.remove();
+            else if (range.satisfyNone())
+                return null;
+        }
+
+        return rangeMap.values();
+    }
+
+    private void andMerge(ColumnValueRange range, Map<TblColRef, ColumnValueRange> rangeMap) {
+        ColumnValueRange columnRange = rangeMap.get(range.getColumn());
+        if (columnRange == null) {
+            rangeMap.put(range.getColumn(), range);
+        } else {
+            columnRange.andMerge(range);
+        }
+    }
+
+    private List<HBaseKeyRange> mergeOverlapRanges(List<HBaseKeyRange> keyRanges) {
+        if (keyRanges.size() <= 1) {
+            return keyRanges;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Merging key range from " + keyRanges.size());
+        }
+
+        // sort ranges by start key
+        Collections.sort(keyRanges);
+
+        // merge the overlap range
+        List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
+        int beginIndex = 0;
+        byte[] maxStopKey = keyRanges.get(0).getStopKey();
+        for (int index = 0; index < keyRanges.size(); index++) {
+            HBaseKeyRange keyRange = keyRanges.get(index);
+            if (Bytes.compareTo(maxStopKey, keyRange.getStartKey()) < 0) {
+                // merge the current key ranges
+                HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, index - 1);
+                mergedRanges.add(mergedRange);
+                // start new merge
+                beginIndex = index;
+            }
+            if (Bytes.compareTo(maxStopKey, keyRange.getStopKey()) < 0) {
+                // update the stop key
+                maxStopKey = keyRange.getStopKey();
+            }
+        }
+        // merge last range
+        HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, keyRanges.size() - 1);
+        mergedRanges.add(mergedRange);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Merging key range to " + mergedRanges.size());
+        }
+        return mergedRanges;
+    }
+
+    private HBaseKeyRange mergeKeyRange(List<HBaseKeyRange> keyRanges, int from, int to) {
+        HBaseKeyRange keyRange = keyRanges.get(from);
+        int mergeSize = to - from + 1;
+        if (mergeSize > 1) {
+            // merge range from mergeHeader to i - 1
+            CubeSegment cubeSegment = keyRange.getCubeSegment();
+            Cuboid cuboid = keyRange.getCuboid();
+            byte[] startKey = keyRange.getStartKey();
+            byte[] stopKey = keyRange.getStopKey();
+            long partitionColumnStartDate = Long.MAX_VALUE;
+            long partitionColumnEndDate = 0;
+            List<Pair<byte[], byte[]>> newFuzzyKeys = new ArrayList<Pair<byte[], byte[]>>(mergeSize);
+            List<Collection<ColumnValueRange>> newFlatOrAndFilter = Lists.newLinkedList();
+
+            boolean hasNonFuzzyRange = false;
+            for (int k = from; k <= to; k++) {
+                HBaseKeyRange nextRange = keyRanges.get(k);
+                hasNonFuzzyRange = hasNonFuzzyRange || nextRange.getFuzzyKeys().isEmpty();
+                newFuzzyKeys.addAll(nextRange.getFuzzyKeys());
+                newFlatOrAndFilter.addAll(nextRange.getFlatOrAndFilter());
+                if (Bytes.compareTo(stopKey, nextRange.getStopKey()) < 0) {
+                    stopKey = nextRange.getStopKey();
+                }
+                if (nextRange.getPartitionColumnStartDate() > 0 && nextRange.getPartitionColumnStartDate() < partitionColumnStartDate) {
+                    partitionColumnStartDate = nextRange.getPartitionColumnStartDate();
+                }
+                if (nextRange.getPartitionColumnEndDate() < Long.MAX_VALUE && nextRange.getPartitionColumnEndDate() > partitionColumnEndDate) {
+                    partitionColumnEndDate = nextRange.getPartitionColumnEndDate();
+                }
+            }
+
+            // if any range is non-fuzzy, then all fuzzy keys must be cleared
+            if (hasNonFuzzyRange) {
+                newFuzzyKeys.clear();
+            }
+
+            partitionColumnStartDate = (partitionColumnStartDate == Long.MAX_VALUE) ? 0 : partitionColumnStartDate;
+            partitionColumnEndDate = (partitionColumnEndDate == 0) ? Long.MAX_VALUE : partitionColumnEndDate;
+            keyRange = new HBaseKeyRange(cubeSegment, cuboid, startKey, stopKey, newFuzzyKeys, newFlatOrAndFilter, partitionColumnStartDate, partitionColumnEndDate);
+        }
+        return keyRange;
+    }
+
+    private List<HBaseKeyRange> mergeTooManyRanges(List<HBaseKeyRange> keyRanges) {
+        if (keyRanges.size() < MERGE_KEYRANGE_THRESHOLD) {
+            return keyRanges;
+        }
+        // TODO: check the distance between range. and merge the large distance range
+        List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
+        HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, 0, keyRanges.size() - 1);
+        mergedRanges.add(mergedRange);
+        return mergedRanges;
+    }
+
+    private void dropUnhitSegments(List<HBaseKeyRange> scans) {
+        if (cubeDesc.getModel().getPartitionDesc().isPartitioned()) {
+            Iterator<HBaseKeyRange> iterator = scans.iterator();
+            while (iterator.hasNext()) {
+                HBaseKeyRange scan = iterator.next();
+                if (scan.hitSegment() == false) {
+                    iterator.remove();
+                }
+            }
+        }
+    }
+
+    private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
+        if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
+            return;
+        }
+
+        int rowSizeEst = dimensions.size() * 3;
+        for (RowValueDecoder decoder : valueDecoders) {
+            MeasureDesc[] measures = decoder.getMeasures();
+            BitSet projectionIndex = decoder.getProjectionIndex();
+            for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
+                FunctionDesc func = measures[i].getFunction();
+                rowSizeEst += func.getReturnDataType().getSpaceEstimate();
+            }
+        }
+
+        long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
+        context.setThreshold((int) rowEst);
+    }
+
+    private void setLimit(TupleFilter filter, StorageContext context) {
+        boolean goodAggr = context.isExactAggregation();
+        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
+        boolean goodSort = context.hasSort() == false;
+        if (goodAggr && goodFilter && goodSort) {
+            logger.info("Enable limit " + context.getLimit());
+            context.enableLimit();
+        }
+    }
+
+    private void setCoprocessor(Set<TblColRef> groupsCopD, List<RowValueDecoder> valueDecoders, StorageContext context) {
+        ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
deleted file mode 100644
index de85190..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase;
-
-import com.google.common.collect.Range;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator;
-import org.apache.kylin.storage.tuple.TupleInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexStorageEngine implements ICachableStorageEngine {
-
-    private static Logger logger = LoggerFactory.getLogger(InvertedIndexStorageEngine.class);
-
-    private IISegment seg;
-    private String uuid;
-    private EndpointTupleIterator dataIterator;
-
-    public InvertedIndexStorageEngine(IIInstance ii) {
-        this.seg = ii.getFirstSegment();
-        this.uuid = ii.getUuid();
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        String tableName = seg.getStorageLocationIdentifier();
-
-        //HConnection is cached, so need not be closed
-        @SuppressWarnings("deprecation")
-        HConnection conn = HBaseConnection.get(context.getConnUrl());
-        try {
-            dataIterator = new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo);
-            return dataIterator;
-        } catch (Throwable e) {
-            logger.error("Error when connecting to II htable " + tableName, e);
-            throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
-        }
-    }
-
-    @Override
-    public Range<Long> getVolatilePeriod() {
-        return dataIterator.getCacheExcludedPeriod();
-    }
-
-    @Override
-    public String getStorageUUID() {
-        return this.uuid;
-    }
-
-    @Override
-    public boolean isDynamic() {
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java
new file mode 100644
index 0000000..762258d
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import com.google.common.collect.Range;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexStorageQuery implements ICachableStorageQuery {
+
+    private static Logger logger = LoggerFactory.getLogger(InvertedIndexStorageQuery.class);
+
+    private IISegment seg;
+    private String uuid;
+    private EndpointTupleIterator dataIterator;
+
+    public InvertedIndexStorageQuery(IIInstance ii) {
+        this.seg = ii.getFirstSegment();
+        this.uuid = ii.getUuid();
+    }
+
+    @Override
+    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+        String tableName = seg.getStorageLocationIdentifier();
+
+        //HConnection is cached, so need not be closed
+        HConnection conn = HBaseConnection.get(context.getConnUrl());
+        try {
+            dataIterator = new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo);
+            return dataIterator;
+        } catch (Throwable e) {
+            logger.error("Error when connecting to II htable " + tableName, e);
+            throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
+        }
+    }
+
+    @Override
+    public Range<Long> getVolatilePeriod() {
+        return dataIterator.getCacheExcludedPeriod();
+    }
+
+    @Override
+    public String getStorageUUID() {
+        return this.uuid;
+    }
+
+    @Override
+    public boolean isDynamic() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
index 3243698..059f75b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
@@ -7,7 +7,7 @@ import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.StorageEngineFactory;
+import org.apache.kylin.storage.StorageFactory;
 import org.apache.kylin.storage.tuple.TupleInfo;
 
 import java.util.List;
@@ -23,7 +23,7 @@ public class HybridStorageEngine implements IStorageQuery {
         this.realizations = hybridInstance.getRealizations();
         storageEngines = new IStorageQuery[realizations.length];
         for (int i = 0; i < realizations.length; i++) {
-            storageEngines[i] = StorageEngineFactory.getStorageEngine(realizations[i]);
+            storageEngines[i] = StorageFactory.createQuery(realizations[i]);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/test/java/org/apache/kylin/storage/test/DynamicCacheTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/DynamicCacheTest.java b/storage/src/test/java/org/apache/kylin/storage/test/DynamicCacheTest.java
index 3814902..46b5588 100644
--- a/storage/src/test/java/org/apache/kylin/storage/test/DynamicCacheTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/test/DynamicCacheTest.java
@@ -12,7 +12,7 @@ import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.metadata.tuple.SimpleTupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
+import org.apache.kylin.storage.ICachableStorageQuery;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.TsConditionExtractor;
@@ -87,7 +87,7 @@ public class DynamicCacheTest {
         final AtomicInteger underlyingSEHitCount = new AtomicInteger(0);
         final List<Integer> returnedRowPerSearch = Lists.newArrayList();
 
-        CacheFledgedDynamicStorageEngine dynamicCache = new CacheFledgedDynamicStorageEngine(new ICachableStorageEngine() {
+        CacheFledgedDynamicStorageEngine dynamicCache = new CacheFledgedDynamicStorageEngine(new ICachableStorageQuery() {
             @Override
             public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
                 Range<Long> tsRagneInQuery = TsConditionExtractor.extractTsCondition(partitionCol, sqlDigest.filter);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
index 2d75535..4408b55 100644
--- a/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
@@ -30,7 +30,7 @@ import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.StorageEngineFactory;
+import org.apache.kylin.storage.StorageFactory;
 import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
 import org.junit.*;
 
@@ -60,7 +60,7 @@ public class ITStorageTest extends HBaseMetadataTestCase {
         CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
         cube = cubeMgr.getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_EMPTY");
         Assert.assertNotNull(cube);
-        storageEngine = StorageEngineFactory.getStorageEngine(cube);
+        storageEngine = StorageFactory.createQuery(cube);
         String url = KylinConfig.getInstanceFromEnv().getStorageUrl();
         context = new StorageContext();
         context.setConnUrl(url);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java b/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
index 112f612..6889668 100644
--- a/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
@@ -10,7 +10,7 @@ import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.metadata.tuple.SimpleTupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
+import org.apache.kylin.storage.ICachableStorageQuery;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine;
 import org.apache.kylin.storage.tuple.Tuple;
@@ -45,7 +45,7 @@ public class StaticCacheTest  {
 
         final AtomicInteger underlyingSEHitCount = new AtomicInteger(0);
 
-        CacheFledgedStaticStorageEngine cacheFledgedStaticStorageEngine = new CacheFledgedStaticStorageEngine(new ICachableStorageEngine() {
+        CacheFledgedStaticStorageEngine cacheFledgedStaticStorageEngine = new CacheFledgedStaticStorageEngine(new ICachableStorageQuery() {
             @Override
             public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
                 underlyingSEHitCount.incrementAndGet();


[4/5] incubator-kylin git commit: KYLIN-878 HBase storage abstraction for cubing flow

Posted by li...@apache.org.
KYLIN-878 HBase storage abstraction for cubing flow


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4d3af3a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4d3af3a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4d3af3a9

Branch: refs/heads/KYLIN-878
Commit: 4d3af3a91686eb638e0b5868986489626a83dffa
Parents: 2e89ea5
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Jul 17 10:27:43 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Jul 17 10:29:08 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/engine/BuildEngineFactory.java |  20 +-
 .../apache/kylin/engine/IBatchCubingEngine.java |   3 +
 .../kylin/engine/mr/BatchCubingJobBuilder.java  | 137 +---
 .../kylin/engine/mr/BatchMergeJobBuilder.java   |  98 +--
 .../kylin/engine/mr/GarbageCollectionStep.java  | 149 -----
 .../org/apache/kylin/engine/mr/IMRInput.java    |   4 +-
 .../kylin/engine/mr/IMRJobFlowParticipant.java  |  34 -
 .../org/apache/kylin/engine/mr/IMROutput.java   |  46 +-
 .../kylin/engine/mr/JobBuilderSupport.java      | 152 ++---
 .../kylin/engine/mr/MRBatchCubingEngine.java    |  29 +-
 .../java/org/apache/kylin/engine/mr/MRUtil.java |  45 ++
 .../engine/mr2/BatchCubingJobBuilder2.java      | 102 +++
 .../kylin/engine/mr2/BatchMergeJobBuilder2.java | 107 +++
 .../org/apache/kylin/engine/mr2/IMROutput2.java |   5 +
 .../kylin/engine/mr2/MRBatchCubingEngine2.java  |  48 ++
 .../kylin/job/engine/JobEngineConfig.java       |   4 -
 .../cardinality/ColumnCardinalityMapper.java    |   4 +-
 .../cardinality/HiveColumnCardinalityJob.java   |   4 +-
 .../job/hadoop/cube/FactDistinctColumnsJob.java |   4 +-
 .../cube/FactDistinctColumnsMapperBase.java     |   4 +-
 .../kylin/job/hadoop/cubev2/InMemCuboidJob.java |   4 +-
 .../job/hadoop/cubev2/InMemCuboidMapper.java    |   4 +-
 .../apache/kylin/source/hive/HiveMRInput.java   |   2 +-
 .../java/org/apache/kylin/storage/IStorage.java |  28 -
 .../apache/kylin/storage/StorageFactory2.java   |  34 +
 .../kylin/storage/hbase/HBaseMROutput.java      |  38 +-
 .../kylin/storage/hbase/HBaseMRSteps.java       | 135 ++++
 .../apache/kylin/storage/hbase/MergeGCStep.java | 121 ++++
 .../kylin/query/enumerator/OLAPEnumerator.java  |   4 +-
 .../kylin/storage/ICachableStorageEngine.java   |  34 -
 .../kylin/storage/ICachableStorageQuery.java    |  33 +
 .../java/org/apache/kylin/storage/IStorage.java |  28 +
 .../kylin/storage/StorageEngineFactory.java     |  83 ---
 .../apache/kylin/storage/StorageFactory.java    |  85 +++
 .../AbstractCacheFledgedStorageEngine.java      |   6 +-
 .../cache/CacheFledgedDynamicStorageEngine.java |   4 +-
 .../cache/CacheFledgedStaticStorageEngine.java  |   4 +-
 .../kylin/storage/cube/CubeStorageEngine.java   | 371 -----------
 .../kylin/storage/cube/CubeStorageQuery.java    | 371 +++++++++++
 .../kylin/storage/hbase/CubeStorageEngine.java  | 663 -------------------
 .../kylin/storage/hbase/CubeStorageQuery.java   | 663 +++++++++++++++++++
 .../hbase/InvertedIndexStorageEngine.java       |  83 ---
 .../hbase/InvertedIndexStorageQuery.java        |  82 +++
 .../storage/hybrid/HybridStorageEngine.java     |   4 +-
 .../kylin/storage/test/DynamicCacheTest.java    |   4 +-
 .../kylin/storage/test/ITStorageTest.java       |   4 +-
 .../kylin/storage/test/StaticCacheTest.java     |   4 +-
 47 files changed, 2094 insertions(+), 1801 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
index c536deb..721b85a 100644
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -18,22 +18,36 @@
 
 package org.apache.kylin.engine;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr2.MRBatchCubingEngine2;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
 public class BuildEngineFactory {
     
-    private static final IBatchCubingEngine defaultBatch = new MRBatchCubingEngine();
+    private static IBatchCubingEngine defaultBatchEngine;
+    
+    public static IBatchCubingEngine defaultBatchEngine() {
+        if (defaultBatchEngine == null) {
+            KylinConfig conf = KylinConfig.getInstanceFromEnv();
+            if (conf.isCubingInMem()) {
+                defaultBatchEngine = new MRBatchCubingEngine2();
+            } else {
+                defaultBatchEngine = new MRBatchCubingEngine();
+            }
+        }
+        return defaultBatchEngine;
+    }
     
     /** Build a new cube segment, typically its time range appends to the end of current cube. */
     public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return defaultBatch.createBatchCubingJob(newSegment, submitter);
+        return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
     }
     
     /** Merge multiple small segments into a big one. */
     public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
-        return defaultBatch.createBatchMergeJob(mergeSegment, submitter);
+        return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
     }
     
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
index 55a080c..904f557 100644
--- a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ b/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -29,4 +29,7 @@ public interface IBatchCubingEngine {
     /** Merge multiple small segments into a big one. */
     public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
     
+    public Class<?> getSourceInterface();
+    
+    public Class<?> getStorageInterface();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 239ce64..0ff3bef 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -21,108 +21,57 @@ package org.apache.kylin.engine.mr;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.job.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
 import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
 
 public class BatchCubingJobBuilder extends JobBuilderSupport {
 
     private final IMRBatchCubingInputSide inputSide;
-    
+    private final IMRBatchCubingOutputSide outputSide;
+
     public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
         super(newSegment, submitter);
-        this.inputSide = MRBatchCubingEngine.getBatchCubingInputSide(seg);
+        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide(seg);
     }
 
     public CubingJob build() {
-        final CubingJob result = CubingJob.createBuildJob(seg,  submitter,  config);
+        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);
         final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
 
         // Phase 1: Create Flat Table
         inputSide.addStepPhase1_CreateFlatTable(result);
-        
+
         // Phase 2: Build Dictionary
         result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
         result.addTask(createBuildDictionaryStep(jobId));
-        
+
         // Phase 3: Build Cube
-        if (config.isInMemCubing()) {
-            result.addTask(createSaveStatisticsStep(jobId));
-            
-            // create htable step
-            result.addTask(createCreateHTableStep(jobId));
-            result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
-            // bulk load step
-            result.addTask(createBulkLoadStep(jobId));
-        } else {
-            final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
-            final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
-            final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
-            
-            // base cuboid step
-            result.addTask(createBaseCuboidStep(flatHiveTableDesc, cuboidOutputTempPath, jobId));
-
-            // n dim cuboid steps
-            for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
-                int dimNum = totalRowkeyColumnsCount - i;
-                result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
-            }
-            result.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
-            // create htable step
-            result.addTask(createCreateHTableStep(jobId));
-            // generate hfiles step
-            result.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
-            // bulk load step
-            result.addTask(createBulkLoadStep(jobId));
+        final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
+        final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+        final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
+        // base cuboid step
+        result.addTask(createBaseCuboidStep(flatHiveTableDesc, cuboidOutputTempPath, jobId));
+        // n dim cuboid steps
+        for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
+            int dimNum = totalRowkeyColumnsCount - i;
+            result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
         }
+        outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
 
         // Phase 4: Update Metadata & Cleanup
         result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
-        inputSide.addStepPhase4_UpdateMetadataAndCleanup(result);
-
-        return result;
-    }
-
-    private MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
-        MapReduceExecutable result = new MapReduceExecutable();
-        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        result.setMapReduceJobClass(FactDistinctColumnsJob.class);
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(config.isInMemCubing()));
-        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
-        appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
+        inputSide.addStepPhase4_Cleanup(result);
+        outputSide.addStepPhase4_Cleanup(result);
 
-        result.setMapReduceParams(cmd.toString());
         return result;
     }
 
-    private HadoopShellExecutable createBuildDictionaryStep(String jobId) {
-        // base cuboid job
-        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
-
-        buildDictionaryStep.setJobParams(cmd.toString());
-        buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
-        return buildDictionaryStep;
-    }
-
     private MapReduceExecutable createBaseCuboidStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String[] cuboidOutputTempPath, String jobId) {
         // base cuboid job
         MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
@@ -165,52 +114,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         return ndCuboidStep;
     }
 
-    private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
-        SaveStatisticsStep result = new SaveStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setStatisticsPath(getStatisticsPath(jobId));
-        return result;
-    }
-
-    private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
-        // base cuboid job
-        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-
-        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
-
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
-        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "level", "0");
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
-        baseCuboidStep.setMapReduceParams(cmd.toString());
-        baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
-        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
-        return baseCuboidStep;
-    }
-
-    private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
-        final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
-        updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
-        updateCubeInfoStep.setSegmentId(seg.getUuid());
-        updateCubeInfoStep.setCubingJobId(jobId);
-        return updateCubeInfoStep;
-    }
-    
-    private String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
-        return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
-    }
-    
     private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
         String[] paths = new String[groupRowkeyColumnsCount + 1];
         for (int i = 0; i <= groupRowkeyColumnsCount; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index cb2d8dd..6264ebd 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -22,23 +22,25 @@ import java.util.List;
 
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public class BatchMergeJobBuilder extends JobBuilderSupport {
 
+    private final IMRBatchMergeOutputSide outputSide;
+
     public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
+        this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
     }
 
     public CubingJob build() {
-        final CubingJob result = CubingJob.createMergeJob(seg,  submitter,  config);
+        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);
 
@@ -46,66 +48,23 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         final List<String> mergingSegmentIds = Lists.newArrayList();
         final List<String> mergingCuboidPaths = Lists.newArrayList();
-        final List<String> mergingHTables = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingSegmentIds.add(merging.getUuid());
             mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
-            mergingHTables.add(merging.getStorageLocationIdentifier());
         }
 
+        // Phase 1: Merge Dictionary
         result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-        
-        if (config.isInMemCubing()) {
-
-            String mergedStatisticsFolder = getStatisticsPath(jobId);
-            result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
-
-            // create htable step
-            result.addTask(createCreateHTableStep(jobId));
-
-            String formattedTables = StringUtil.join(mergingHTables, ",");
-            result.addTask(createMergeCuboidDataFromHBaseStep(formattedTables, jobId));
-
-        } else {
-            // merge cuboid
-            String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
-            result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
-            
-            // convert htable
-            result.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
-            // create htable step
-            result.addTask(createCreateHTableStep(jobId));
-            // generate hfiles step
-            result.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
-        }
-        
-        // bulk load step
-        result.addTask(createBulkLoadStep(jobId));
-
-        // update cube info
-        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
 
-        result.addTask(createGarbageCollectionStep(mergingHTables, null));
+        // Phase 2: Merge Cube Files
+        String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+        result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
+        outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
 
-        return result;
-    }
-    
-    private MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
-        MergeDictionaryStep result = new MergeDictionaryStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        return result;
-    }
+        // Phase 3: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+        outputSide.addStepPhase3_Cleanup(result);
 
-    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
-        MergeStatisticsStep result = new MergeStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setMergedStatisticsPath(mergedStatisticsFolder);
         return result;
     }
 
@@ -126,35 +85,4 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
         return mergeCuboidDataStep;
     }
 
-
-    private MapReduceExecutable createMergeCuboidDataFromHBaseStep(String inputTableNames, String jobId) {
-        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
-        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", inputTableNames);
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
-        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
-        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-        return mergeCuboidDataStep;
-    }
-
-    private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
-        UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
-        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setCubingJobId(jobId);
-        return result;
-    }
-
-    
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
deleted file mode 100644
index d79f35d..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
- */
-@Deprecated // only exists for backward compatibility
-public class GarbageCollectionStep extends AbstractExecutable {
-
-    private static final String OLD_HTABLES = "oldHTables";
-
-    private static final String OLD_HIVE_TABLE = "oldHiveTable";
-
-    private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
-    public GarbageCollectionStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-
-        StringBuffer output = new StringBuffer();
-
-        final String hiveTable = this.getOldHiveTable();
-        if (StringUtils.isNotEmpty(hiveTable)) {
-            final String dropHiveCMD = "hive -e \"DROP TABLE IF EXISTS  " + hiveTable + ";\"";
-            ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
-            try {
-                context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
-                output.append("Hive table " + hiveTable + " is dropped. \n");
-            } catch (IOException e) {
-                logger.error("job:" + getId() + " execute finished with exception", e);
-                output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage());
-                return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
-            }
-        }
-
-
-        List<String> oldTables = getOldHTables();
-        if (oldTables != null && oldTables.size() > 0) {
-            String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
-            Configuration conf = HBaseConfiguration.create();
-            HBaseAdmin admin = null;
-            try {
-                admin = new HBaseAdmin(conf);
-                for (String table : oldTables) {
-                    if (admin.tableExists(table)) {
-                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
-                        String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
-                        if (metadataUrlPrefix.equalsIgnoreCase(host)) {
-                            if (admin.isTableEnabled(table)) {
-                                admin.disableTable(table);
-                            }
-                            admin.deleteTable(table);
-                            logger.debug("Dropped htable: " + table);
-                            output.append("HBase table " + table + " is dropped. \n");
-                        } else {
-                            logger.debug("Skip htable: " + table);
-                            output.append("Skip htable: " + table + ". \n");
-                        }
-                    }
-                }
-
-            } catch (IOException e) {
-                output.append("Got error when drop HBase table, exiting... \n");
-                // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
-                return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
-            } finally {
-                if (admin != null)
-                    try {
-                        admin.close();
-                    } catch (IOException e) {
-                        logger.error(e.getLocalizedMessage());
-                    }
-            }
-        }
-
-
-        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
-    }
-
-    public void setOldHTables(List<String> ids) {
-        setParam(OLD_HTABLES, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getOldHTables() {
-        final String ids = getParam(OLD_HTABLES);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setOldHiveTable(String hiveTable) {
-        setParam(OLD_HIVE_TABLE, hiveTable);
-    }
-
-    private String getOldHiveTable() {
-        return getParam(OLD_HIVE_TABLE);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 08ed94a..fb4767e 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -48,7 +48,7 @@ public interface IMRInput {
     
     /**
      * Participate the batch cubing flow as the input side. Responsible for creating
-     * intermediate flat table (Phase 1) and clean up if necessary (Phase 4).
+     * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
      * 
      * - Phase 1: Create Flat Table
      * - Phase 2: Build Dictionary
@@ -61,7 +61,7 @@ public interface IMRInput {
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
         
         /** Add step that does necessary clean up, like delete the intermediate flat table */
-        public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow);
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
         
         /** Return an InputFormat that reads from the intermediate flat table */
         public IMRTableInputFormat getFlatTableInputFormat();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
deleted file mode 100644
index 6a94920..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.util.List;
-
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-public interface IMRJobFlowParticipant {
-
-    public List<? extends AbstractExecutable> contributePhase1CreateFlatTable(List<? extends AbstractExecutable> steps);
-    
-    public List<? extends AbstractExecutable> contributePhase2CreateDictionary(List<? extends AbstractExecutable> steps);
-    
-    public List<? extends AbstractExecutable> contributePhase3BuildCube(List<? extends AbstractExecutable> steps);
-    
-    public List<? extends AbstractExecutable> contributePhase4UpdateCubeMetadata(List<? extends AbstractExecutable> steps);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index 6e3a42c..8896a2e 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -18,9 +18,49 @@
 
 package org.apache.kylin.engine.mr;
 
-public interface IMROutput {
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
-    public IMRJobFlowParticipant createBuildFlowParticipant();
+public interface IMROutput {
 
-    public IMRJobFlowParticipant createMergeFlowParticipant();
+    /** Return a helper to participate in batch cubing job flow. */
+    public IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
+    
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage (Phase 3).
+     * 
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary
+     * - Phase 3: Build Cube
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    public interface IMRBatchCubingOutputSide {
+        
+        /** Add step that saves cuboid output from HDFS to storage. */
+        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+        
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+    
+    /** Return a helper to participate in batch merge job flow. */
+    public IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
+    
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage (Phase 2).
+     * 
+     * - Phase 1: Merge Dictionary
+     * - Phase 2: Merge Cube
+     * - Phase 3: Update Metadata & Cleanup
+     */
+    public interface IMRBatchMergeOutputSide {
+        
+        /** Add step that saves cuboid output from HDFS to storage. */
+        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+        
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 79430f6..4652269 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,120 +23,115 @@ import java.util.List;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.job.common.HadoopShellExecutable;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
-import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
-import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
+import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
 
 import com.google.common.base.Preconditions;
 
 /**
- * Hold reusable methods for real builders.
+ * Hold reusable steps for builders.
  */
-abstract public class JobBuilderSupport {
+public class JobBuilderSupport {
 
-    protected final JobEngineConfig config;
-    protected final CubeSegment seg;
-    protected final String submitter;
+    final protected JobEngineConfig config;
+    final protected CubeSegment seg;
+    final protected String submitter;
 
-    protected JobBuilderSupport(CubeSegment seg, String submitter) {
+    public JobBuilderSupport(CubeSegment seg, String submitter) {
         Preconditions.checkNotNull(seg, "segment cannot be null");
         this.config = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
         this.seg = seg;
         this.submitter = submitter;
     }
     
-    protected MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
-        MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
-        rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
+    public MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
+        return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, false);
+    }
+    
+    public MapReduceExecutable createFactDistinctColumnsStepWithStats(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
+        return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, true);
+    }
+    
+    private MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId, boolean withStats) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+        result.setMapReduceJobClass(FactDistinctColumnsJob.class);
         StringBuilder cmd = new StringBuilder();
-
         appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId));
         appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+        appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
 
-        rowkeyDistributionStep.setMapReduceParams(cmd.toString());
-        rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
-        return rowkeyDistributionStep;
+        result.setMapReduceParams(cmd.toString());
+        return result;
     }
 
-    protected HadoopShellExecutable createCreateHTableStep(String jobId) {
-        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
-        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+    public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
+        // base cuboid job
+        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
+        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
         StringBuilder cmd = new StringBuilder();
         appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(config.isInMemCubing()));
-
-        createHtableStep.setJobParams(cmd.toString());
-        createHtableStep.setJobClass(CreateHTableJob.class);
+        appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
 
-        return createHtableStep;
+        buildDictionaryStep.setJobParams(cmd.toString());
+        buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
+        return buildDictionaryStep;
     }
 
-    protected MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) {
-        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
-        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
-
-        createHFilesStep.setMapReduceParams(cmd.toString());
-        createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
-        createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-
-        return createHFilesStep;
+    public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
+        final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
+        updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+        updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
+        updateCubeInfoStep.setSegmentId(seg.getUuid());
+        updateCubeInfoStep.setCubingJobId(jobId);
+        return updateCubeInfoStep;
     }
 
-    protected HadoopShellExecutable createBulkLoadStep(String jobId) {
-        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
-        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-
-        bulkLoadStep.setJobParams(cmd.toString());
-        bulkLoadStep.setJobClass(BulkLoadJob.class);
-
-        return bulkLoadStep;
+    public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
+        MergeDictionaryStep result = new MergeDictionaryStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        return result;
     }
-
-    protected GarbageCollectionStep createGarbageCollectionStep(List<String> oldHtables, String interimTable) {
-        GarbageCollectionStep result = new GarbageCollectionStep();
-        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
-        result.setOldHTables(oldHtables);
-        result.setOldHiveTable(interimTable);
+    
+    public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
+        UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
+        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        result.setCubingJobId(jobId);
         return result;
     }
 
-    protected String getJobWorkingDir(String jobId) {
+    // ============================================================================
+
+    public String getJobWorkingDir(String jobId) {
         return getJobWorkingDir(config, jobId);
     }
     
-    protected String getCuboidRootPath(String jobId) {
+    public String getCuboidRootPath(String jobId) {
         return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
     }
     
-    protected String getCuboidRootPath(CubeSegment seg) {
+    public String getCuboidRootPath(CubeSegment seg) {
         return getCuboidRootPath(seg.getLastBuildJobID());
     }
     
-    protected void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
+    public void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
         try {
             String jobConf = config.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
             if (jobConf != null && jobConf.length() > 0) {
@@ -146,25 +141,20 @@ abstract public class JobBuilderSupport {
             throw new RuntimeException(e);
         }
     }
+    
+    public String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
+        return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
+    }
 
-    protected String getFactDistinctColumnsPath(String jobId) {
+    public String getFactDistinctColumnsPath(String jobId) {
         return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
     }
 
 
-    protected String getStatisticsPath(String jobId) {
+    public String getStatisticsPath(String jobId) {
         return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/statistics";
     }
 
-
-    protected String getHFilePath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
-    }
-
-    protected String getRowkeyDistributionOutputPath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
-    }
-
     // ============================================================================
     // static methods also shared by other job flow participant
     // ----------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
index b374a99..61328c9 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -18,15 +18,9 @@
 
 package org.apache.kylin.engine.mr;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.TableSourceFactory;
 
 public class MRBatchCubingEngine implements IBatchCubingEngine {
 
@@ -40,25 +34,14 @@ public class MRBatchCubingEngine implements IBatchCubingEngine {
         return new BatchMergeJobBuilder(mergeSegment, submitter).build();
     }
     
-    public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
-        TableDesc tableDesc = getTableDesc(seg.getCubeDesc().getFactTable());
-        return getMRInput(tableDesc).getBatchCubingInputSide(seg);
-    }
-
-    public static IMRTableInputFormat getTableInputFormat(String tableName) {
-        return getTableInputFormat(getTableDesc(tableName));
-    }
-
-    public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
-        return getMRInput(tableDesc).getTableInputFormat(tableDesc);
-    }
-
-    private static IMRInput getMRInput(TableDesc tableDesc) {
-        return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class);
+    @Override
+    public Class<?> getSourceInterface() {
+        return IMRInput.class;
     }
 
-    private static TableDesc getTableDesc(String tableName) {
-        return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+    @Override
+    public Class<?> getStorageInterface() {
+        return IMROutput.class;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
new file mode 100644
index 0000000..099a614
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -0,0 +1,45 @@
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.storage.StorageFactory2;
+
+public class MRUtil {
+
+    public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+        TableDesc tableDesc = getTableDesc(seg.getCubeDesc().getFactTable());
+        return getMRInput(tableDesc).getBatchCubingInputSide(seg);
+    }
+
+    public static IMRTableInputFormat getTableInputFormat(String tableName) {
+        return getTableInputFormat(getTableDesc(tableName));
+    }
+
+    public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
+        return getMRInput(tableDesc).getTableInputFormat(tableDesc);
+    }
+
+    private static IMRInput getMRInput(TableDesc tableDesc) {
+        return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class);
+    }
+
+    private static TableDesc getTableDesc(String tableName) {
+        return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+    }
+
+    public static IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput.class).getBatchCubingOutputSide(seg);
+    }
+
+    public static IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput.class).getBatchMergeOutputSide(seg);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
new file mode 100644
index 0000000..e83db30
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr2;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
+import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
+
+public class BatchCubingJobBuilder2 extends JobBuilderSupport {
+
+    private final IMRBatchCubingInputSide inputSide;
+
+    public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+        super(newSegment, submitter);
+        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+        final String jobId = result.getId();
+        final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+
+        // Phase 1: Create Flat Table
+        inputSide.addStepPhase1_CreateFlatTable(result);
+
+        // Phase 2: Build Dictionary
+        result.addTask(createFactDistinctColumnsStepWithStats(flatHiveTableDesc, jobId));
+        result.addTask(createBuildDictionaryStep(jobId));
+
+        // Phase 3: Build Cube
+        result.addTask(createSaveStatisticsStep(jobId)); //<<<<<
+
+        // create htable step
+        result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+        result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
+        // bulk load step
+        result.addTask(createBulkLoadStep(jobId)); //<<<<<
+
+        // Phase 4: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+        inputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+        SaveStatisticsStep result = new SaveStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setStatisticsPath(getStatisticsPath(jobId));
+        return result;
+    }
+
+    private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
+        // base cuboid job
+        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, seg);
+
+        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "level", "0");
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+
+        baseCuboidStep.setMapReduceParams(cmd.toString());
+        baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
+        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+        return baseCuboidStep;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
new file mode 100644
index 0000000..25ff082
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr2;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
+import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class BatchMergeJobBuilder2 extends JobBuilderSupport {
+
+    public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+        super(mergeSegment, submitter);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+        final String jobId = result.getId();
+
+        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+        final List<String> mergingSegmentIds = Lists.newArrayList();
+        final List<String> mergingCuboidPaths = Lists.newArrayList();
+        final List<String> mergingHTables = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingSegmentIds.add(merging.getUuid());
+            mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
+            mergingHTables.add(merging.getStorageLocationIdentifier());
+        }
+
+        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+
+        String mergedStatisticsFolder = getStatisticsPath(jobId);
+        result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
+
+        // create htable step
+        result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+
+        String formattedTables = StringUtil.join(mergingHTables, ",");
+        result.addTask(createMergeCuboidDataFromHBaseStep(formattedTables, jobId));
+
+        // bulk load step
+        result.addTask(createBulkLoadStep(jobId)); //<<<<<
+
+        // update cube info
+        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+
+        result.addTask(createGarbageCollectionStep(mergingHTables, null)); //<<<<<
+
+        return result;
+    }
+
+    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+        MergeStatisticsStep result = new MergeStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        result.setMergedStatisticsPath(mergedStatisticsFolder);
+        return result;
+    }
+
+    private MapReduceExecutable createMergeCuboidDataFromHBaseStep(String inputTableNames, String jobId) {
+        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", inputTableNames);
+        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+
+        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
+        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+        return mergeCuboidDataStep;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
new file mode 100644
index 0000000..aeddb9b
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
@@ -0,0 +1,5 @@
+package org.apache.kylin.engine.mr2;
+
+public interface IMROutput2 {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
new file mode 100644
index 0000000..8ec6f69
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr2;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine2 implements IBatchCubingEngine {
+
+    @Override
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+        return new BatchCubingJobBuilder2(newSegment, submitter).build();
+    }
+
+    @Override
+    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+        return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
+    }
+    
+    @Override
+    public Class<?> getSourceInterface() {
+        return IMRInput.class;
+    }
+
+    @Override
+    public Class<?> getStorageInterface() {
+        return IMROutput2.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
index 422c6d7..2eb9b31 100644
--- a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -35,10 +35,6 @@ public class JobEngineConfig {
     public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
     public static String HIVE_CONF_FILENAME = "kylin_hive_conf";
 
-    public boolean isInMemCubing() {
-        return config.isCubingInMem();
-    }
-    
     private static File getJobConfig(String fileName) {
         String path = System.getProperty(KylinConfig.KYLIN_CONF);
         if (StringUtils.isNotEmpty(path)) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
index 8da6cde..071554f 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -33,7 +33,7 @@ import org.apache.kylin.common.mr.KylinMapper;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.MetadataManager;
@@ -62,7 +62,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
         
         String tableName = conf.get(BatchConstants.TABLE_NAME);
         tableDesc = MetadataManager.getInstance(config).getTableDesc(tableName);
-        tableInputFormat = MRBatchCubingEngine.getTableInputFormat(tableDesc);
+        tableInputFormat = MRUtil.getTableInputFormat(tableDesc);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
index 58fd509..0670178 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 
@@ -77,7 +77,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
             job.getConfiguration().set("dfs.block.size", "67108864");
 
             // Mapper
-            IMRTableInputFormat tableInputFormat = MRBatchCubingEngine.getTableInputFormat(table);
+            IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table);
             tableInputFormat.configureJob(job);
 
             job.setMapperClass(ColumnCardinalityMapper.class);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index 77c4f7e..1697339 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -34,7 +34,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -100,7 +100,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
     }
 
     private void setupMapper(CubeSegment cubeSeg) throws IOException {
-        IMRTableInputFormat flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+        IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
         flatTableInputFormat.configureJob(job);
 
         job.setMapperClass(FactDistinctHiveColumnsMapper.class);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
index ee8ab3e..09dd3bf 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
@@ -18,7 +18,7 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -68,7 +68,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
             }
         }
         
-        flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
     }
 
     protected void handleErrorRecord(String[] record, Exception ex) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
index f89b143..c3c9539 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
@@ -36,7 +36,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.DataModelDesc;
@@ -87,7 +87,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             job.getConfiguration().set(BatchConstants.CUBE_CAPACITY, realizationCapacity.toString());
 
             // set Mapper
-            IMRTableInputFormat flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+            IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
             flatTableInputFormat.configureJob(job);
             job.setMapperClass(InMemCuboidMapper.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index 3eb29df..b0d8b2c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -26,7 +26,7 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.job.inmemcubing.DoggedCubeBuilder;
@@ -61,7 +61,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, Immutab
         cubeDesc = cube.getDescriptor();
         String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
         cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
 
         Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 4b01f24..f005bc9 100644
--- a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -125,7 +125,7 @@ public class HiveMRInput implements IMRInput {
         }
 
         @Override
-        public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow) {
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
             step.setOldHiveTable(flatHiveTableDesc.getTableName());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/storage/IStorage.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/IStorage.java b/job/src/main/java/org/apache/kylin/storage/IStorage.java
deleted file mode 100644
index 89b96e9..0000000
--- a/job/src/main/java/org/apache/kylin/storage/IStorage.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.metadata.realization.IRealization;
-
-public interface IStorage {
-    
-    public IStorageQuery createStorageQuery(IRealization realization);
-
-    public <I> I adaptToBuildEngine(Class<I> engineInterface);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/storage/StorageFactory2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/StorageFactory2.java b/job/src/main/java/org/apache/kylin/storage/StorageFactory2.java
new file mode 100644
index 0000000..9a1dcd7
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/StorageFactory2.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.storage.hbase.HBaseStorage;
+
+/**
+ */
+public class StorageFactory2 {
+    
+    private static final IStorage dft = new HBaseStorage();
+    
+    public static <T> T createEngineAdapter(IRealization realization, Class<T> engineInterface) {
+        return dft.adaptToBuildEngine(engineInterface);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
index 3de467e..5588d81 100644
--- a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
@@ -18,21 +18,43 @@
 
 package org.apache.kylin.storage.hbase;
 
-import org.apache.kylin.engine.mr.IMRJobFlowParticipant;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMROutput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
 public class HBaseMROutput implements IMROutput {
 
     @Override
-    public IMRJobFlowParticipant createBuildFlowParticipant() {
-        // TODO Auto-generated method stub
-        return null;
+    public IMRBatchCubingOutputSide getBatchCubingOutputSide(final CubeSegment seg) {
+        return new IMRBatchCubingOutputSide() {
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+            @Override
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
+                steps.addSaveCuboidToHTableSteps(jobFlow, cuboidRootPath);
+            }
+
+            @Override
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+                // nothing to do
+            }
+        };
     }
 
     @Override
-    public IMRJobFlowParticipant createMergeFlowParticipant() {
-        // TODO Auto-generated method stub
-        return null;
-    }
+    public IMRBatchMergeOutputSide getBatchMergeOutputSide(final CubeSegment seg) {
+        return new IMRBatchMergeOutputSide() {
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
 
+            @Override
+            public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
+                steps.addSaveCuboidToHTableSteps(jobFlow, cuboidRootPath);
+            }
+
+            @Override
+            public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createMergeGCStep());
+            }
+        };
+    }
 }


[3/5] incubator-kylin git commit: KYLIN-878 HBase storage abstraction for cubing flow

Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMRSteps.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMRSteps.java
new file mode 100644
index 0000000..d4a0e98
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMRSteps.java
@@ -0,0 +1,135 @@
+package org.apache.kylin.storage.hbase;
+
+import java.util.List;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.common.HadoopShellExecutable;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
+import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
+import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
+import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class HBaseMRSteps extends JobBuilderSupport {
+    
+    public HBaseMRSteps(CubeSegment seg) {
+        super(seg, null);
+    }
+
+    public void addSaveCuboidToHTableSteps(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
+        String jobId = jobFlow.getId();
+        
+        // calculate key distribution
+        jobFlow.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
+        // create htable step
+        jobFlow.addTask(createCreateHTableStep(jobId));
+        // generate hfiles step
+        jobFlow.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
+        // bulk load step
+        jobFlow.addTask(createBulkLoadStep(jobId));
+    }
+
+    public MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
+        MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
+        rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "input", inputPath);
+        appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId));
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
+
+        rowkeyDistributionStep.setMapReduceParams(cmd.toString());
+        rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
+        return rowkeyDistributionStep;
+    }
+
+    public HadoopShellExecutable createCreateHTableStep(String jobId) {
+        return createCreateHTableStep(jobId, false);
+    }
+    
+    public HadoopShellExecutable createCreateHTableStepWithStats(String jobId) {
+        return createCreateHTableStep(jobId, true);
+    }
+    
+    private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats) {
+        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
+        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+        StringBuilder cmd = new StringBuilder();
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+
+        createHtableStep.setJobParams(cmd.toString());
+        createHtableStep.setJobClass(CreateHTableJob.class);
+
+        return createHtableStep;
+    }
+
+    public MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) {
+        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
+        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "input", inputPath);
+        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
+
+        createHFilesStep.setMapReduceParams(cmd.toString());
+        createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
+        createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+
+        return createHFilesStep;
+    }
+
+    public HadoopShellExecutable createBulkLoadStep(String jobId) {
+        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
+        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
+
+        StringBuilder cmd = new StringBuilder();
+        appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+
+        bulkLoadStep.setJobParams(cmd.toString());
+        bulkLoadStep.setJobClass(BulkLoadJob.class);
+
+        return bulkLoadStep;
+    }
+    
+    public MergeGCStep createMergeGCStep() {
+        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+        final List<String> mergingHTables = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingHTables.add(merging.getStorageLocationIdentifier());
+        }
+
+        MergeGCStep result = new MergeGCStep();
+        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        result.setOldHTables(mergingHTables);
+        return result;
+    }
+    
+    public String getHFilePath(String jobId) {
+        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
+    }
+
+    public String getRowkeyDistributionOutputPath(String jobId) {
+        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/storage/hbase/MergeGCStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/MergeGCStep.java b/job/src/main/java/org/apache/kylin/storage/hbase/MergeGCStep.java
new file mode 100644
index 0000000..5fc57e3
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/MergeGCStep.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Drop HBase tables that is no longer needed
+ */
+public class MergeGCStep extends AbstractExecutable {
+
+    private static final String OLD_HTABLES = "oldHTables";
+
+    private static final Logger logger = LoggerFactory.getLogger(MergeGCStep.class);
+
+    public MergeGCStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+        StringBuffer output = new StringBuffer();
+
+        List<String> oldTables = getOldHTables();
+        if (oldTables != null && oldTables.size() > 0) {
+            String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+            Configuration conf = HBaseConfiguration.create();
+            HBaseAdmin admin = null;
+            try {
+                admin = new HBaseAdmin(conf);
+                for (String table : oldTables) {
+                    if (admin.tableExists(table)) {
+                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+                        String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
+                        if (metadataUrlPrefix.equalsIgnoreCase(host)) {
+                            if (admin.isTableEnabled(table)) {
+                                admin.disableTable(table);
+                            }
+                            admin.deleteTable(table);
+                            logger.debug("Dropped htable: " + table);
+                            output.append("HBase table " + table + " is dropped. \n");
+                        } else {
+                            logger.debug("Skip htable: " + table);
+                            output.append("Skip htable: " + table + ". \n");
+                        }
+                    }
+                }
+
+            } catch (IOException e) {
+                output.append("Got error when drop HBase table, exiting... \n");
+                // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
+                return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
+            } finally {
+                if (admin != null)
+                    try {
+                        admin.close();
+                    } catch (IOException e) {
+                        logger.error(e.getLocalizedMessage());
+                    }
+            }
+        }
+
+        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+    }
+
+    public void setOldHTables(List<String> ids) {
+        setParam(OLD_HTABLES, StringUtils.join(ids, ","));
+    }
+
+    private List<String> getOldHTables() {
+        final String ids = getParam(OLD_HTABLES);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id : splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
index e938cbd..bea0e36 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
@@ -32,7 +32,7 @@ import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.storage.IStorageQuery;
-import org.apache.kylin.storage.StorageEngineFactory;
+import org.apache.kylin.storage.StorageFactory;
 import org.apache.kylin.storage.hbase.coprocessor.DictCodeSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,7 +111,7 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
         olapContext.resetSQLDigest();
 
         // query storage engine
-        IStorageQuery storageEngine = StorageEngineFactory.getStorageEngine(olapContext.realization);
+        IStorageQuery storageEngine = StorageFactory.createQuery(olapContext.realization);
         ITupleIterator iterator = storageEngine.search(olapContext.storageContext, olapContext.getSQLDigest(), olapContext.returnTupleInfo);
         if (logger.isDebugEnabled()) {
             logger.debug("return TupleIterator...");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
deleted file mode 100644
index 1fcd9b3..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.kylin.storage;
-
-import com.google.common.collect.Range;
-
-/**
- */
-public interface ICachableStorageEngine extends IStorageQuery {
-    /**
-     *
-     * being dynamic => getVolatilePeriod() return not null
-     * being dynamic => partition column of its realization not null
-     *
-     * @return true for static storage like cubes
-     *          false for dynamic storage like II
-     */
-    boolean isDynamic();
-
-    /**
-     * volatile period is the period of time in which the returned data is not stable
-     * e.g. inverted index's last several minutes' data is dynamic as time goes by.
-     * data in this period cannot be cached
-     *
-     * This method should not be called before ITupleIterator.close() is called
-     *
-     * @return null if the underlying storage guarantees the data is static
-     */
-    Range<Long> getVolatilePeriod();
-
-    /**
-     * get the uuid for the realization assigned to this storage engine
-     * @return
-     */
-    String getStorageUUID();
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
new file mode 100644
index 0000000..179202e
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
@@ -0,0 +1,33 @@
+package org.apache.kylin.storage;
+
+import com.google.common.collect.Range;
+
+/**
+ */
+public interface ICachableStorageQuery extends IStorageQuery {
+    /**
+     *
+     * being dynamic => getVolatilePeriod() return not null
+     * being dynamic => partition column of its realization not null
+     *
+     * @return true for static storage like cubes
+     *          false for dynamic storage like II
+     */
+    boolean isDynamic();
+
+    /**
+     * volatile period is the period of time in which the returned data is not stable
+     * e.g. inverted index's last several minutes' data is dynamic as time goes by.
+     * data in this period cannot be cached
+     *
+     * This method should not be called before ITupleIterator.close() is called
+     *
+     * @return null if the underlying storage guarantees the data is static
+     */
+    Range<Long> getVolatilePeriod();
+
+    /**
+     * get the uuid for the realization that serves this query
+     */
+    String getStorageUUID();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/IStorage.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/IStorage.java b/storage/src/main/java/org/apache/kylin/storage/IStorage.java
new file mode 100644
index 0000000..89b96e9
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/IStorage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.metadata.realization.IRealization;
+
+public interface IStorage {
+    
+    public IStorageQuery createStorageQuery(IRealization realization);
+
+    public <I> I adaptToBuildEngine(Class<I> engineInterface);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
deleted file mode 100644
index 3afcd32..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine;
-import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine;
-import org.apache.kylin.storage.hbase.CubeStorageEngine;
-import org.apache.kylin.storage.hbase.InvertedIndexStorageEngine;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.apache.kylin.storage.hybrid.HybridStorageEngine;
-
-import com.google.common.base.Preconditions;
-
-/**
- * @author xjiang
- */
-public class StorageEngineFactory {
-    private static boolean allowStorageLayerCache = true;
-
-    public static IStorageQuery getStorageEngine(IRealization realization) {
-
-        if (realization.getType() == RealizationType.INVERTED_INDEX) {
-            ICachableStorageEngine ret = new InvertedIndexStorageEngine((IIInstance) realization);
-            if (allowStorageLayerCache) {
-                return wrapWithCache(ret, realization);
-            } else {
-                return ret;
-            }
-        } else if (realization.getType() == RealizationType.CUBE) {
-            ICachableStorageEngine ret = new CubeStorageEngine((CubeInstance) realization);
-            if (allowStorageLayerCache) {
-                return wrapWithCache(ret, realization);
-            } else {
-                return ret;
-            }
-        } else {
-            return new HybridStorageEngine((HybridInstance) realization);
-        }
-    }
-
-    private static IStorageQuery wrapWithCache(ICachableStorageEngine underlyingStorageEngine, IRealization realization) {
-        if (underlyingStorageEngine.isDynamic()) {
-            return new CacheFledgedDynamicStorageEngine(underlyingStorageEngine, getPartitionCol(realization));
-        } else {
-            return new CacheFledgedStaticStorageEngine(underlyingStorageEngine);
-        }
-    }
-
-    private static TblColRef getPartitionCol(IRealization realization) {
-        String modelName = realization.getModelName();
-        DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
-        PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
-        Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
-        TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef();
-        Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
-        return partitionColRef;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
new file mode 100644
index 0000000..77e3010
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine;
+import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine;
+import org.apache.kylin.storage.hbase.CubeStorageQuery;
+import org.apache.kylin.storage.hbase.InvertedIndexStorageQuery;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridStorageEngine;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * @author xjiang
+ */
+public class StorageFactory {
+    
+    private static boolean allowStorageLayerCache = true;
+
+    public static IStorageQuery createQuery(IRealization realization) {
+
+        if (realization.getType() == RealizationType.INVERTED_INDEX) {
+            ICachableStorageQuery ret = new InvertedIndexStorageQuery((IIInstance) realization);
+            if (allowStorageLayerCache) {
+                return wrapWithCache(ret, realization);
+            } else {
+                return ret;
+            }
+        } else if (realization.getType() == RealizationType.CUBE) {
+            ICachableStorageQuery ret = new CubeStorageQuery((CubeInstance) realization);
+            if (allowStorageLayerCache) {
+                return wrapWithCache(ret, realization);
+            } else {
+                return ret;
+            }
+        } else {
+            return new HybridStorageEngine((HybridInstance) realization);
+        }
+    }
+
+    private static IStorageQuery wrapWithCache(ICachableStorageQuery underlyingStorageEngine, IRealization realization) {
+        if (underlyingStorageEngine.isDynamic()) {
+            return new CacheFledgedDynamicStorageEngine(underlyingStorageEngine, getPartitionCol(realization));
+        } else {
+            return new CacheFledgedStaticStorageEngine(underlyingStorageEngine);
+        }
+    }
+
+    private static TblColRef getPartitionCol(IRealization realization) {
+        String modelName = realization.getModelName();
+        DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
+        PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
+        Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
+        TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef();
+        Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
+        return partitionColRef;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
index 34a8066..61e008f 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
@@ -10,7 +10,7 @@ import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
 
 import org.apache.kylin.metadata.realization.StreamSQLDigest;
 import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
-import org.apache.kylin.storage.ICachableStorageEngine;
+import org.apache.kylin.storage.ICachableStorageQuery;
 import org.apache.kylin.storage.IStorageQuery;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -24,10 +24,10 @@ public abstract class AbstractCacheFledgedStorageEngine implements IStorageQuery
     protected static CacheManager CACHE_MANAGER;
 
     protected boolean queryCacheExists;
-    protected ICachableStorageEngine underlyingStorage;
+    protected ICachableStorageQuery underlyingStorage;
     protected StreamSQLDigest streamSQLDigest;
 
-    public AbstractCacheFledgedStorageEngine(ICachableStorageEngine underlyingStorage) {
+    public AbstractCacheFledgedStorageEngine(ICachableStorageQuery underlyingStorage) {
         this.underlyingStorage = underlyingStorage;
         this.makeCacheIfNecessary(underlyingStorage.getStorageUUID());
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
index 0cfa382..86e2e88 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
@@ -12,7 +12,7 @@ import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.realization.SQLDigestUtil;
 import org.apache.kylin.metadata.realization.StreamSQLDigest;
 import org.apache.kylin.metadata.tuple.*;
-import org.apache.kylin.storage.ICachableStorageEngine;
+import org.apache.kylin.storage.ICachableStorageQuery;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.TsConditionExtractor;
 import org.apache.kylin.storage.tuple.TupleInfo;
@@ -30,7 +30,7 @@ public class CacheFledgedDynamicStorageEngine extends AbstractCacheFledgedStorag
 
     private Range<Long> ts;
 
-    public CacheFledgedDynamicStorageEngine(ICachableStorageEngine underlyingStorage, TblColRef partitionColRef) {
+    public CacheFledgedDynamicStorageEngine(ICachableStorageQuery underlyingStorage, TblColRef partitionColRef) {
         super(underlyingStorage);
         this.partitionColRef = partitionColRef;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
index 60b8a1b..1e95722 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
@@ -9,7 +9,7 @@ import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.metadata.tuple.SimpleTupleIterator;
 import org.apache.kylin.metadata.tuple.TeeTupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
+import org.apache.kylin.storage.ICachableStorageQuery;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
@@ -20,7 +20,7 @@ import java.util.List;
 public class CacheFledgedStaticStorageEngine extends AbstractCacheFledgedStorageEngine {
     private static final Logger logger = LoggerFactory.getLogger(CacheFledgedStaticStorageEngine.class);
 
-    public CacheFledgedStaticStorageEngine(ICachableStorageEngine underlyingStorage) {
+    public CacheFledgedStaticStorageEngine(ICachableStorageQuery underlyingStorage) {
         super(underlyingStorage);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
deleted file mode 100644
index 549961f..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
+++ /dev/null
@@ -1,371 +0,0 @@
-package org.apache.kylin.storage.cube;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.DerivedFilterTranslator;
-import org.apache.kylin.storage.tuple.TupleInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-
-public class CubeStorageEngine implements ICachableStorageEngine {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeStorageEngine.class);
-
-    private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
-
-    private final CubeInstance cubeInstance;
-    private final CubeDesc cubeDesc;
-
-    public CubeStorageEngine(CubeInstance cube) {
-        this.cubeInstance = cube;
-        this.cubeDesc = cube.getDescriptor();
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
-        TupleFilter filter = sqlDigest.filter;
-
-        // build dimension & metrics
-        Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
-        Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
-        buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
-
-        // all dimensions = groups + filter dimensions
-        Set<TblColRef> filterDims = Sets.newHashSet(dimensions);
-        filterDims.removeAll(groups);
-
-        // expand derived (xxxD means contains host columns only, derived columns were translated)
-        Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
-        Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
-        Set<TblColRef> filterDimsD = expandDerived(filterDims, derivedPostAggregation);
-        filterDimsD.removeAll(groupsD);
-        derivedPostAggregation.removeAll(groups);
-
-        // identify cuboid
-        Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
-        dimensionsD.addAll(groupsD);
-        dimensionsD.addAll(filterDimsD);
-        Cuboid cuboid = identifyCuboid(dimensionsD);
-        context.setCuboid(cuboid);
-
-        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
-        Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
-        boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
-        context.setExactAggregation(isExactAggregation);
-        
-        if (isExactAggregation) {
-            metrics = replaceHolisticCountDistinct(metrics);
-        }
-
-        // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
-        TupleFilter filterD = translateDerived(filter, groupsD);
-
-        setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
-        // TODO enable coprocessor
-//        setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
-        setLimit(filter, context);
-
-        List<CubeScanner> scanners = Lists.newArrayList();
-        for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
-            scanners.add(new CubeScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD));
-        }
-        
-        if (scanners.isEmpty())
-            return ITupleIterator.EMPTY_TUPLE_ITERATOR;
-        
-        return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo);
-    }
-
-    private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
-        for (FunctionDesc func : sqlDigest.aggregations) {
-            if (!func.isDimensionAsMetric()) {
-                // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
-                metrics.add(findAggrFuncFromCubeDesc(func));
-            }
-        }
-
-        for (TblColRef column : sqlDigest.allColumns) {
-            // skip measure columns
-            if (sqlDigest.metricColumns.contains(column)) {
-                continue;
-            }
-            dimensions.add(column);
-        }
-    }
-    
-    private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
-        for (MeasureDesc measure : cubeDesc.getMeasures()) {
-            if (measure.getFunction().equals(aggrFunc))
-                return measure.getFunction();
-        }
-        return aggrFunc;
-    }
-
-    private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
-        Set<TblColRef> expanded = Sets.newHashSet();
-        for (TblColRef col : cols) {
-            if (cubeDesc.isDerived(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                for (TblColRef hostCol : hostInfo.columns) {
-                    expanded.add(hostCol);
-                    if (hostInfo.isOneToOne == false)
-                        derivedPostAggregation.add(hostCol);
-                }
-            } else {
-                expanded.add(col);
-            }
-        }
-        return expanded;
-    }
-
-    private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
-        long cuboidID = 0;
-        for (TblColRef column : dimensions) {
-            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
-            cuboidID |= 1L << index;
-        }
-        return Cuboid.findById(cubeDesc, cuboidID);
-    }
-
-    @SuppressWarnings("unchecked")
-    private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
-        Collection<? extends TupleFilter> toCheck;
-        if (filter instanceof CompareTupleFilter) {
-            toCheck = Collections.singleton(filter);
-        } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
-            toCheck = filter.getChildren();
-        } else {
-            return (Set<TblColRef>) Collections.EMPTY_SET;
-        }
-
-        Set<TblColRef> result = Sets.newHashSet();
-        for (TupleFilter f : toCheck) {
-            if (f instanceof CompareTupleFilter) {
-                CompareTupleFilter compFilter = (CompareTupleFilter) f;
-                // is COL=const ?
-                if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
-                    result.add(compFilter.getColumn());
-                }
-            }
-        }
-
-        // expand derived
-        Set<TblColRef> resultD = Sets.newHashSet();
-        for (TblColRef col : result) {
-            if (cubeDesc.isDerived(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                if (hostInfo.isOneToOne) {
-                    for (TblColRef hostCol : hostInfo.columns) {
-                        resultD.add(hostCol);
-                    }
-                }
-                //if not one2one, it will be pruned
-            } else {
-                resultD.add(col);
-            }
-        }
-        return resultD;
-    }
-
-    private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
-        boolean exact = true;
-
-        if (cuboid.requirePostAggregation()) {
-            exact = false;
-            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
-        }
-
-        // derived aggregation is bad, unless expanded columns are already in group by
-        if (groups.containsAll(derivedPostAggregation) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
-        }
-
-        // other columns (from filter) is bad, unless they are ensured to have single value
-        if (singleValuesD.containsAll(othersD) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because some column not on group by: " + othersD //
-                    + " (single value column: " + singleValuesD + ")");
-        }
-
-        if (exact) {
-            logger.info("exactAggregation is true");
-        }
-        return exact;
-    }
-
-    private Set<FunctionDesc> replaceHolisticCountDistinct(Set<FunctionDesc> metrics) {
-        // for count distinct, try use its holistic version if possible
-        Set<FunctionDesc> result = new LinkedHashSet<FunctionDesc>();
-        for (FunctionDesc metric : metrics) {
-            if (metric.isCountDistinct() == false) {
-                result.add(metric);
-                continue;
-            }
-            
-            FunctionDesc holisticVersion = null;
-            for (MeasureDesc measure : cubeDesc.getMeasures()) {
-                FunctionDesc measureFunc = measure.getFunction();
-                if (measureFunc.equals(metric) && measureFunc.isHolisticCountDistinct()) {
-                    holisticVersion = measureFunc;
-                }
-            }
-            result.add(holisticVersion == null ? metric : holisticVersion);
-        }
-        return result;
-    }
-
-    @SuppressWarnings("unchecked")
-    private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter == null)
-            return filter;
-
-        if (filter instanceof CompareTupleFilter) {
-            return translateDerivedInCompare((CompareTupleFilter) filter, collector);
-        }
-
-        List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
-        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
-        boolean modified = false;
-        for (TupleFilter child : children) {
-            TupleFilter translated = translateDerived(child, collector);
-            newChildren.add(translated);
-            if (child != translated)
-                modified = true;
-        }
-        if (modified) {
-            filter = replaceChildren(filter, newChildren);
-        }
-        return filter;
-    }
-
-    private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
-        if (filter instanceof LogicalTupleFilter) {
-            LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
-            r.addChildren(newChildren);
-            return r;
-        } else
-            throw new IllegalStateException("Cannot replaceChildren on " + filter);
-    }
-
-    private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
-        if (compf.getColumn() == null || compf.getValues().isEmpty())
-            return compf;
-
-        TblColRef derived = compf.getColumn();
-        if (cubeDesc.isDerived(derived) == false)
-            return compf;
-
-        DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
-        CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
-        CubeSegment seg = cubeInstance.getLatestReadySegment();
-        LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
-        Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
-        TupleFilter translatedFilter = translated.getFirst();
-        boolean loosened = translated.getSecond();
-        if (loosened) {
-            collectColumnsRecursively(translatedFilter, collector);
-        }
-        return translatedFilter;
-    }
-
-    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter instanceof ColumnTupleFilter) {
-            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
-        }
-        for (TupleFilter child : filter.getChildren()) {
-            collectColumnsRecursively(child, collector);
-        }
-    }
-
-    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
-        if (cubeDesc.isDerived(col)) {
-            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-            for (TblColRef h : hostInfo.columns)
-                collector.add(h);
-        } else {
-            collector.add(col);
-        }
-    }
-
-    private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
-        boolean hasMemHungryCountDistinct = false;
-        for (FunctionDesc func : metrics) {
-            if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
-                hasMemHungryCountDistinct = true;
-            }
-        }
-
-        // need to limit the memory usage for memory hungry count distinct
-        if (hasMemHungryCountDistinct == false) {
-            return;
-        }
-
-        int rowSizeEst = dimensions.size() * 3;
-        for (FunctionDesc func : metrics) {
-            rowSizeEst += func.getReturnDataType().getSpaceEstimate();
-        }
-
-        long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
-        context.setThreshold((int) rowEst);
-    }
-
-    private void setLimit(TupleFilter filter, StorageContext context) {
-        boolean goodAggr = context.isExactAggregation();
-        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
-        boolean goodSort = context.hasSort() == false;
-        if (goodAggr && goodFilter && goodSort) {
-            logger.info("Enable limit " + context.getLimit());
-            context.enableLimit();
-        }
-    }
-    
-    // ============================================================================
-
-    @Override
-    public Range<Long> getVolatilePeriod() {
-        return null;
-    }
-
-    @Override
-    public String getStorageUUID() {
-        return cubeInstance.getUuid();
-    }
-
-
-    @Override
-    public boolean isDynamic() {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java
new file mode 100644
index 0000000..8164310
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java
@@ -0,0 +1,371 @@
+package org.apache.kylin.storage.cube;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.DerivedFilterTranslator;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+public class CubeStorageQuery implements ICachableStorageQuery {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
+
+    private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
+
+    private final CubeInstance cubeInstance;
+    private final CubeDesc cubeDesc;
+
+    public CubeStorageQuery(CubeInstance cube) {
+        this.cubeInstance = cube;
+        this.cubeDesc = cube.getDescriptor();
+    }
+
+    @Override
+    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+        TupleFilter filter = sqlDigest.filter;
+
+        // build dimension & metrics
+        Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
+        Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
+        buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
+
+        // all dimensions = groups + filter dimensions
+        Set<TblColRef> filterDims = Sets.newHashSet(dimensions);
+        filterDims.removeAll(groups);
+
+        // expand derived (xxxD means contains host columns only, derived columns were translated)
+        Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
+        Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
+        Set<TblColRef> filterDimsD = expandDerived(filterDims, derivedPostAggregation);
+        filterDimsD.removeAll(groupsD);
+        derivedPostAggregation.removeAll(groups);
+
+        // identify cuboid
+        Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
+        dimensionsD.addAll(groupsD);
+        dimensionsD.addAll(filterDimsD);
+        Cuboid cuboid = identifyCuboid(dimensionsD);
+        context.setCuboid(cuboid);
+
+        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
+        Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
+        boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
+        context.setExactAggregation(isExactAggregation);
+        
+        if (isExactAggregation) {
+            metrics = replaceHolisticCountDistinct(metrics);
+        }
+
+        // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
+        TupleFilter filterD = translateDerived(filter, groupsD);
+
+        setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
+        // TODO enable coprocessor
+//        setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
+        setLimit(filter, context);
+
+        List<CubeScanner> scanners = Lists.newArrayList();
+        for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+            scanners.add(new CubeScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD));
+        }
+        
+        if (scanners.isEmpty())
+            return ITupleIterator.EMPTY_TUPLE_ITERATOR;
+        
+        return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo);
+    }
+
+    private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
+        for (FunctionDesc func : sqlDigest.aggregations) {
+            if (!func.isDimensionAsMetric()) {
+                // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
+                metrics.add(findAggrFuncFromCubeDesc(func));
+            }
+        }
+
+        for (TblColRef column : sqlDigest.allColumns) {
+            // skip measure columns
+            if (sqlDigest.metricColumns.contains(column)) {
+                continue;
+            }
+            dimensions.add(column);
+        }
+    }
+    
+    private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            if (measure.getFunction().equals(aggrFunc))
+                return measure.getFunction();
+        }
+        return aggrFunc;
+    }
+
+    private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
+        Set<TblColRef> expanded = Sets.newHashSet();
+        for (TblColRef col : cols) {
+            if (cubeDesc.isDerived(col)) {
+                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+                for (TblColRef hostCol : hostInfo.columns) {
+                    expanded.add(hostCol);
+                    if (hostInfo.isOneToOne == false)
+                        derivedPostAggregation.add(hostCol);
+                }
+            } else {
+                expanded.add(col);
+            }
+        }
+        return expanded;
+    }
+
+    private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
+        long cuboidID = 0;
+        for (TblColRef column : dimensions) {
+            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
+            cuboidID |= 1L << index;
+        }
+        return Cuboid.findById(cubeDesc, cuboidID);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
+        Collection<? extends TupleFilter> toCheck;
+        if (filter instanceof CompareTupleFilter) {
+            toCheck = Collections.singleton(filter);
+        } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
+            toCheck = filter.getChildren();
+        } else {
+            return (Set<TblColRef>) Collections.EMPTY_SET;
+        }
+
+        Set<TblColRef> result = Sets.newHashSet();
+        for (TupleFilter f : toCheck) {
+            if (f instanceof CompareTupleFilter) {
+                CompareTupleFilter compFilter = (CompareTupleFilter) f;
+                // is COL=const ?
+                if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
+                    result.add(compFilter.getColumn());
+                }
+            }
+        }
+
+        // expand derived
+        Set<TblColRef> resultD = Sets.newHashSet();
+        for (TblColRef col : result) {
+            if (cubeDesc.isDerived(col)) {
+                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+                if (hostInfo.isOneToOne) {
+                    for (TblColRef hostCol : hostInfo.columns) {
+                        resultD.add(hostCol);
+                    }
+                }
+                //if not one2one, it will be pruned
+            } else {
+                resultD.add(col);
+            }
+        }
+        return resultD;
+    }
+
+    private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
+        boolean exact = true;
+
+        if (cuboid.requirePostAggregation()) {
+            exact = false;
+            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+        }
+
+        // derived aggregation is bad, unless expanded columns are already in group by
+        if (groups.containsAll(derivedPostAggregation) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
+        }
+
+        // other columns (from filter) is bad, unless they are ensured to have single value
+        if (singleValuesD.containsAll(othersD) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because some column not on group by: " + othersD //
+                    + " (single value column: " + singleValuesD + ")");
+        }
+
+        if (exact) {
+            logger.info("exactAggregation is true");
+        }
+        return exact;
+    }
+
+    private Set<FunctionDesc> replaceHolisticCountDistinct(Set<FunctionDesc> metrics) {
+        // for count distinct, try use its holistic version if possible
+        Set<FunctionDesc> result = new LinkedHashSet<FunctionDesc>();
+        for (FunctionDesc metric : metrics) {
+            if (metric.isCountDistinct() == false) {
+                result.add(metric);
+                continue;
+            }
+            
+            FunctionDesc holisticVersion = null;
+            for (MeasureDesc measure : cubeDesc.getMeasures()) {
+                FunctionDesc measureFunc = measure.getFunction();
+                if (measureFunc.equals(metric) && measureFunc.isHolisticCountDistinct()) {
+                    holisticVersion = measureFunc;
+                }
+            }
+            result.add(holisticVersion == null ? metric : holisticVersion);
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter == null)
+            return filter;
+
+        if (filter instanceof CompareTupleFilter) {
+            return translateDerivedInCompare((CompareTupleFilter) filter, collector);
+        }
+
+        List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
+        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
+        boolean modified = false;
+        for (TupleFilter child : children) {
+            TupleFilter translated = translateDerived(child, collector);
+            newChildren.add(translated);
+            if (child != translated)
+                modified = true;
+        }
+        if (modified) {
+            filter = replaceChildren(filter, newChildren);
+        }
+        return filter;
+    }
+
+    private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
+        if (filter instanceof LogicalTupleFilter) {
+            LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
+            r.addChildren(newChildren);
+            return r;
+        } else
+            throw new IllegalStateException("Cannot replaceChildren on " + filter);
+    }
+
+    private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
+        if (compf.getColumn() == null || compf.getValues().isEmpty())
+            return compf;
+
+        TblColRef derived = compf.getColumn();
+        if (cubeDesc.isDerived(derived) == false)
+            return compf;
+
+        DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
+        CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
+        CubeSegment seg = cubeInstance.getLatestReadySegment();
+        LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
+        Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+        TupleFilter translatedFilter = translated.getFirst();
+        boolean loosened = translated.getSecond();
+        if (loosened) {
+            collectColumnsRecursively(translatedFilter, collector);
+        }
+        return translatedFilter;
+    }
+
+    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter instanceof ColumnTupleFilter) {
+            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
+        }
+        for (TupleFilter child : filter.getChildren()) {
+            collectColumnsRecursively(child, collector);
+        }
+    }
+
+    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
+        if (cubeDesc.isDerived(col)) {
+            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+            for (TblColRef h : hostInfo.columns)
+                collector.add(h);
+        } else {
+            collector.add(col);
+        }
+    }
+
+    private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
+        boolean hasMemHungryCountDistinct = false;
+        for (FunctionDesc func : metrics) {
+            if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
+                hasMemHungryCountDistinct = true;
+            }
+        }
+
+        // need to limit the memory usage for memory hungry count distinct
+        if (hasMemHungryCountDistinct == false) {
+            return;
+        }
+
+        int rowSizeEst = dimensions.size() * 3;
+        for (FunctionDesc func : metrics) {
+            rowSizeEst += func.getReturnDataType().getSpaceEstimate();
+        }
+
+        long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
+        context.setThreshold((int) rowEst);
+    }
+
+    private void setLimit(TupleFilter filter, StorageContext context) {
+        boolean goodAggr = context.isExactAggregation();
+        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
+        boolean goodSort = context.hasSort() == false;
+        if (goodAggr && goodFilter && goodSort) {
+            logger.info("Enable limit " + context.getLimit());
+            context.enableLimit();
+        }
+    }
+    
+    // ============================================================================
+
+    @Override
+    public Range<Long> getVolatilePeriod() {
+        return null;
+    }
+
+    @Override
+    public String getStorageUUID() {
+        return cubeInstance.getUuid();
+    }
+
+
+    @Override
+    public boolean isDynamic() {
+        return false;
+    }
+
+}


[5/5] incubator-kylin git commit: KYLIN-878 half way

Posted by li...@apache.org.
KYLIN-878 half way


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9a82f39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9a82f39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9a82f39b

Branch: refs/heads/KYLIN-878
Commit: 9a82f39bdfc60347e259547be6460f958986a0e8
Parents: 4d3af3a
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Jul 20 08:59:10 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Jul 20 08:59:57 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/BytesSplitter.java |   9 +-
 .../apache/kylin/engine/BuildEngineFactory.java |   2 +-
 .../kylin/engine/mr/BatchCubingJobBuilder.java  |  10 +-
 .../kylin/engine/mr/BatchCubingJobBuilder2.java | 100 +++++++++++++++++
 .../kylin/engine/mr/BatchMergeJobBuilder2.java  | 102 ++++++++++++++++++
 .../org/apache/kylin/engine/mr/IMROutput.java   |  16 ++-
 .../org/apache/kylin/engine/mr/IMROutput2.java  |  53 +++++++++
 .../kylin/engine/mr/JobBuilderSupport.java      |  15 +--
 .../kylin/engine/mr/MRBatchCubingEngine2.java   |  47 ++++++++
 .../java/org/apache/kylin/engine/mr/MRUtil.java |  10 ++
 .../engine/mr2/BatchCubingJobBuilder2.java      | 102 ------------------
 .../kylin/engine/mr2/BatchMergeJobBuilder2.java | 107 -------------------
 .../org/apache/kylin/engine/mr2/IMROutput2.java |   5 -
 .../kylin/engine/mr2/MRBatchCubingEngine2.java  |  48 ---------
 .../apache/kylin/job/hadoop/cube/CuboidJob.java |  45 +++++---
 .../job/hadoop/cube/HiveToBaseCuboidMapper.java |  28 ++++-
 .../kylin/storage/hbase/HBaseMROutput2.java     |  71 ++++++++++++
 17 files changed, 467 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
index bd16246..7249dcf 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author xjiang
  */
 public class BytesSplitter {
     private static final Logger logger = LoggerFactory.getLogger(BytesSplitter.class);
@@ -79,6 +78,14 @@ public class BytesSplitter {
 
         return bufferSize;
     }
+    
+    public void setBuffers(byte[][] buffers) {
+        for (int i = 0; i < buffers.length; i++) {
+            splitBuffers[i].value = buffers[i];
+            splitBuffers[i].length = buffers[i].length;
+        }
+        this.bufferSize = buffers.length;
+    }
 
     public byte inferByteRowDelimiter(byte[] bytes, int byteLen, int expectedSplits) throws IOException {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
index 721b85a..d24c99c 100644
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -21,7 +21,7 @@ package org.apache.kylin.engine;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.MRBatchCubingEngine;
-import org.apache.kylin.engine.mr2.MRBatchCubingEngine2;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
 public class BuildEngineFactory {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 0ff3bef..a39ac74 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.job.common.MapReduceExecutable;
@@ -42,13 +41,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);
-        final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
 
         // Phase 1: Create Flat Table
         inputSide.addStepPhase1_CreateFlatTable(result);
 
         // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
+        result.addTask(createFactDistinctColumnsStep(jobId));
         result.addTask(createBuildDictionaryStep(jobId));
 
         // Phase 3: Build Cube
@@ -56,7 +54,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
         final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
         // base cuboid step
-        result.addTask(createBaseCuboidStep(flatHiveTableDesc, cuboidOutputTempPath, jobId));
+        result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
         // n dim cuboid steps
         for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
             int dimNum = totalRowkeyColumnsCount - i;
@@ -72,7 +70,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         return result;
     }
 
-    private MapReduceExecutable createBaseCuboidStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String[] cuboidOutputTempPath, String jobId) {
+    private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
         // base cuboid job
         MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
 
@@ -83,7 +81,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
 
         appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+        appendExecCmdParameters(cmd, "input", ""); // marks flat table input
         appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
         appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "level", "0");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
new file mode 100644
index 0000000..a83c596
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
+import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
+
+public class BatchCubingJobBuilder2 extends JobBuilderSupport {
+
+    private final IMRBatchCubingInputSide inputSide;
+    private final IMRBatchCubingOutputSide2 outputSide;
+
+    public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+        super(newSegment, submitter);
+        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+        final String jobId = result.getId();
+        final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+
+        // Phase 1: Create Flat Table
+        inputSide.addStepPhase1_CreateFlatTable(result);
+
+        // Phase 2: Build Dictionary
+        result.addTask(createFactDistinctColumnsStepWithStats(jobId));
+        result.addTask(createBuildDictionaryStep(jobId));
+        result.addTask(createSaveStatisticsStep(jobId));
+        outputSide.addStepPhase2_BuildDictionary(result);
+
+        // Phase 3: Build Cube
+        result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
+        outputSide.addStepPhase3_BuildCube(result);
+
+        // Phase 4: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+        inputSide.addStepPhase4_Cleanup(result);
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+        SaveStatisticsStep result = new SaveStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setStatisticsPath(getStatisticsPath(jobId));
+        return result;
+    }
+
+    private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
+        // base cuboid job
+        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, seg);
+
+        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "level", "0");
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+
+        baseCuboidStep.setMapReduceParams(cmd.toString());
+        baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
+        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+        return baseCuboidStep;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
new file mode 100644
index 0000000..f97de13
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
+import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class BatchMergeJobBuilder2 extends JobBuilderSupport {
+
+    private final IMRBatchCubingOutputSide2 outputSide;
+    
+    public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+        super(mergeSegment, submitter);
+        this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+        final String jobId = result.getId();
+
+        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+        final List<String> mergingSegmentIds = Lists.newArrayList();
+        final List<String> mergingHTables = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingSegmentIds.add(merging.getUuid());
+            mergingHTables.add(merging.getStorageLocationIdentifier());
+        }
+
+        // Phase 1: Merge Dictionary
+        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+        result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
+        outputSide.addStepPhase2_BuildDictionary(result);
+
+        // Phase 2: Merge Cube
+        String formattedTables = StringUtil.join(mergingHTables, ",");
+        result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
+        outputSide.addStepPhase3_BuildCube(result);
+
+        // Phase 3: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+        MergeStatisticsStep result = new MergeStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        result.setMergedStatisticsPath(mergedStatisticsFolder);
+        return result;
+    }
+
+    private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", inputTableNames);
+        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+
+        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
+        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+        return mergeCuboidDataStep;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index 8896a2e..bc6ee1f 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -37,7 +37,13 @@ public interface IMROutput {
      */
     public interface IMRBatchCubingOutputSide {
         
-        /** Add step that saves cuboid output from HDFS to storage. */
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
+         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
         public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
         
         /** Add step that does any necessary clean up. */
@@ -57,7 +63,13 @@ public interface IMROutput {
      */
     public interface IMRBatchMergeOutputSide {
         
-        /** Add step that saves cuboid output from HDFS to storage. */
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
+         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
         public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
         
         /** Add step that does any necessary clean up. */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
new file mode 100644
index 0000000..9aecba9
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -0,0 +1,53 @@
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IMROutput2 {
+
+    /** Return a helper to participate in batch cubing job flow. */
+    public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
+    
+    /**
+     * Participate the batch cubing flow as the output side.
+     * 
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary
+     * - Phase 3: Build Cube
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    public interface IMRBatchCubingOutputSide2 {
+        
+        /** Add step that executes after build dictionary and before build cube. */
+        public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+        /** Add step that executes after build cube. */
+        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+        
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+    
+    /** Return a helper to participate in batch merge job flow. */
+    public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
+    
+    /**
+     * Participate the batch merge flow as the output side.
+     * 
+     * - Phase 1: Merge Dictionary
+     * - Phase 2: Merge Cube
+     * - Phase 3: Update Metadata & Cleanup
+     */
+    public interface IMRBatchMergeOutputSide2 {
+        
+        /** Add step that executes after merge dictionary and before merge cube. */
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+        /** Add step that executes after merge cube. */
+        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+        
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 4652269..42d30c8 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.job.common.HadoopShellExecutable;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -49,15 +48,15 @@ public class JobBuilderSupport {
         this.submitter = submitter;
     }
     
-    public MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
-        return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, false);
+    public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
+        return createFactDistinctColumnsStep(jobId, false);
     }
     
-    public MapReduceExecutable createFactDistinctColumnsStepWithStats(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
-        return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, true);
+    public MapReduceExecutable createFactDistinctColumnsStepWithStats(String jobId) {
+        return createFactDistinctColumnsStep(jobId, true);
     }
     
-    private MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId, boolean withStats) {
+    private MapReduceExecutable createFactDistinctColumnsStep(String jobId, boolean withStats) {
         MapReduceExecutable result = new MapReduceExecutable();
         result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
         result.setMapReduceJobClass(FactDistinctColumnsJob.class);
@@ -142,10 +141,6 @@ public class JobBuilderSupport {
         }
     }
     
-    public String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
-        return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
-    }
-
     public String getFactDistinctColumnsPath(String jobId) {
         return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
new file mode 100644
index 0000000..57ec128
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine2 implements IBatchCubingEngine {
+
+    @Override
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+        return new BatchCubingJobBuilder2(newSegment, submitter).build();
+    }
+
+    @Override
+    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+        return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
+    }
+    
+    @Override
+    public Class<?> getSourceInterface() {
+        return IMRInput.class;
+    }
+
+    @Override
+    public Class<?> getStorageInterface() {
+        return IMROutput2.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 099a614..4c44af7 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -6,6 +6,8 @@ import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.TableSourceFactory;
@@ -42,4 +44,12 @@ public class MRUtil {
         return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput.class).getBatchMergeOutputSide(seg);
     }
 
+    public static IMRBatchCubingOutputSide2 getBatchCubingOutputSide2(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput2.class).getBatchCubingOutputSide(seg);
+    }
+    
+    public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput2.class).getBatchMergeOutputSide(seg);
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
deleted file mode 100644
index e83db30..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
-
-public class BatchCubingJobBuilder2 extends JobBuilderSupport {
-
-    private final IMRBatchCubingInputSide inputSide;
-
-    public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
-        super(newSegment, submitter);
-        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
-    }
-
-    public CubingJob build() {
-        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
-        final String jobId = result.getId();
-        final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
-
-        // Phase 1: Create Flat Table
-        inputSide.addStepPhase1_CreateFlatTable(result);
-
-        // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStepWithStats(flatHiveTableDesc, jobId));
-        result.addTask(createBuildDictionaryStep(jobId));
-
-        // Phase 3: Build Cube
-        result.addTask(createSaveStatisticsStep(jobId)); //<<<<<
-
-        // create htable step
-        result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
-        result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
-        // bulk load step
-        result.addTask(createBulkLoadStep(jobId)); //<<<<<
-
-        // Phase 4: Update Metadata & Cleanup
-        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
-        inputSide.addStepPhase4_Cleanup(result);
-
-        return result;
-    }
-
-    private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
-        SaveStatisticsStep result = new SaveStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setStatisticsPath(getStatisticsPath(jobId));
-        return result;
-    }
-
-    private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
-        // base cuboid job
-        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-
-        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
-
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
-        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "level", "0");
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
-        baseCuboidStep.setMapReduceParams(cmd.toString());
-        baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
-        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
-        return baseCuboidStep;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
deleted file mode 100644
index 25ff082..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import java.util.List;
-
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class BatchMergeJobBuilder2 extends JobBuilderSupport {
-
-    public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
-        super(mergeSegment, submitter);
-    }
-
-    public CubingJob build() {
-        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
-        final String jobId = result.getId();
-
-        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
-        final List<String> mergingSegmentIds = Lists.newArrayList();
-        final List<String> mergingCuboidPaths = Lists.newArrayList();
-        final List<String> mergingHTables = Lists.newArrayList();
-        for (CubeSegment merging : mergingSegments) {
-            mergingSegmentIds.add(merging.getUuid());
-            mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
-            mergingHTables.add(merging.getStorageLocationIdentifier());
-        }
-
-        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-
-        String mergedStatisticsFolder = getStatisticsPath(jobId);
-        result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
-
-        // create htable step
-        result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
-
-        String formattedTables = StringUtil.join(mergingHTables, ",");
-        result.addTask(createMergeCuboidDataFromHBaseStep(formattedTables, jobId));
-
-        // bulk load step
-        result.addTask(createBulkLoadStep(jobId)); //<<<<<
-
-        // update cube info
-        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
-
-        result.addTask(createGarbageCollectionStep(mergingHTables, null)); //<<<<<
-
-        return result;
-    }
-
-    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
-        MergeStatisticsStep result = new MergeStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setMergedStatisticsPath(mergedStatisticsFolder);
-        return result;
-    }
-
-    private MapReduceExecutable createMergeCuboidDataFromHBaseStep(String inputTableNames, String jobId) {
-        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
-        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", inputTableNames);
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
-        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
-        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-        return mergeCuboidDataStep;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
deleted file mode 100644
index aeddb9b..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.kylin.engine.mr2;
-
-public interface IMROutput2 {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
deleted file mode 100644
index 8ec6f69..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class MRBatchCubingEngine2 implements IBatchCubingEngine {
-
-    @Override
-    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return new BatchCubingJobBuilder2(newSegment, submitter).build();
-    }
-
-    @Override
-    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
-        return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
-    }
-    
-    @Override
-    public Class<?> getSourceInterface() {
-        return IMRInput.class;
-    }
-
-    @Override
-    public Class<?> getStorageInterface() {
-        return IMROutput2.class;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
index 87fc188..46bb9fc 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.job.hadoop.cube;
 
 import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -32,11 +33,15 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidCLI;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +60,9 @@ public class CuboidJob extends AbstractHadoopJob {
 
     @Override
     public int run(String[] args) throws Exception {
+        if (this.mapperClass == null)
+            throw new Exception("Mapper class is not set!");
+        
         Options options = new Options();
 
         try {
@@ -67,7 +75,6 @@ public class CuboidJob extends AbstractHadoopJob {
             options.addOption(OPTION_INPUT_FORMAT);
             parseOptions(options, args);
 
-            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
             int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
@@ -79,26 +86,11 @@ public class CuboidJob extends AbstractHadoopJob {
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             logger.info("Starting: " + job.getJobName());
-            FileInputFormat.setInputPaths(job, input);
 
             setJobClasspath(job);
 
             // Mapper
-            if (this.mapperClass == null) {
-                throw new Exception("Mapper class is not set!");
-            }
-
-            boolean isInputTextFormat = false;
-            if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
-                isInputTextFormat = true;
-            }
-
-            if (isInputTextFormat) {
-                job.setInputFormatClass(TextInputFormat.class);
-
-            } else {
-                job.setInputFormatClass(SequenceFileInputFormat.class);
-            }
+            configureMapperInputFormat(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
             job.setMapperClass(this.mapperClass);
             job.setMapOutputKeyClass(Text.class);
             job.setMapOutputValueClass(Text.class);
@@ -130,6 +122,25 @@ public class CuboidJob extends AbstractHadoopJob {
         }
     }
 
+    private void configureMapperInputFormat(CubeSegment cubeSeg) throws IOException {
+        String input = getOptionValue(OPTION_INPUT_PATH);
+        
+        if (StringUtils.isBlank(input)) {
+            // base cuboid case
+            IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+            flatTableInputFormat.configureJob(job);
+        }
+        else {
+            // n-dimension cuboid case
+            FileInputFormat.setInputPaths(job, new Path(input));
+            if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
+                job.setInputFormatClass(TextInputFormat.class);
+            } else {
+                job.setInputFormatClass(SequenceFileInputFormat.class);
+            }
+        }
+    }
+
     protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
         Configuration jobConf = job.getConfiguration();
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
index 599dde8..9fa1159 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
@@ -19,25 +19,37 @@
 package org.apache.kylin.job.hadoop.cube;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 
-import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.job.constant.BatchConstants;
 
 /**
  * @author George Song (ysong1)
  */
-public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Text> {
+public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Object> {
+    
+    private IMRTableInputFormat flatTableInputFormat;
 
     @Override
-    public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
+    protected void setup(Context context) throws IOException {
+        super.setup(context);
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+    }
+
+    @Override
+    public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
         counter++;
         if (counter % BatchConstants.COUNTER_MAX == 0) {
             logger.info("Handled " + counter + " records!");
         }
+        
 
         try {
             //put a record into the shared bytesSplitter
-            bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
+            String[] row = flatTableInputFormat.parseMapperInput(value);
+            bytesSplitter.setBuffers(convertUTF8Bytes(row));
             //take care of the data in bytesSplitter
             outputKV(context);
 
@@ -46,4 +58,12 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, T
         }
     }
 
+    private byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException {
+        byte[][] result = new byte[row.length][];
+        for (int i = 0; i < row.length; i++) {
+            result[i] = row[i].getBytes("UTF-8");
+        }
+        return result;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
new file mode 100644
index 0000000..63e5902
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class HBaseMROutput2 implements IMROutput2 {
+
+    @Override
+    public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
+        return new IMRBatchCubingOutputSide2() {
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+            @Override
+            public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+                result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+            }
+
+            @Override
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+                result.addTask(createBulkLoadStep(jobId)); //<<<<<
+            }
+
+            @Override
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+                // nothing to do
+            }
+        };
+    }
+
+    @Override
+    public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) {
+        return new IMRBatchMergeOutputSide2() {
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+            }
+
+            @Override
+            public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
+                result.addTask(createBulkLoadStep(jobId)); //<<<<<
+            }
+
+            @Override
+            public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+                result.addTask(createGarbageCollectionStep(mergingHTables, null)); //<<<<<
+            }
+        };
+    }
+}