You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/08 12:16:12 UTC

incubator-kylin git commit: KYLIN-876 Refactor CubingJobBuilder to decouple individual steps

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 ba9a8a143 -> e38ddb780


KYLIN-876 Refactor CubingJobBuilder to decouple individual steps


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e38ddb78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e38ddb78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e38ddb78

Branch: refs/heads/0.8
Commit: e38ddb78081dffefe19c563d74c2a279804f0e07
Parents: ba9a8a1
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Jul 7 18:24:26 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Jul 8 18:15:07 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/engine/IBatchCubeBuilder.java  | 32 +++++++
 .../org/apache/kylin/engine/IBuildEngine.java   | 24 +++++
 .../org/apache/kylin/engine/mr/IMRInput.java    | 25 +++++
 .../kylin/engine/mr/IMRJobFlowParticipant.java  | 34 +++++++
 .../org/apache/kylin/engine/mr/IMROutput.java   | 25 +++++
 .../apache/kylin/engine/mr/MRBuildEngine.java   | 31 +++++++
 .../apache/kylin/engine/mr/MRCubeBuilder.java   | 45 +++++++++
 .../kylin/job/common/MapReduceExecutable.java   | 36 ++++++-
 .../org/apache/kylin/job/cube/CubingJob.java    | 36 ++++++-
 .../apache/kylin/job/cube/CubingJobBuilder.java | 98 ++++++++------------
 .../job/cube/UpdateCubeInfoAfterBuildStep.java  | 64 +++----------
 .../job/cube/UpdateCubeInfoAfterMergeStep.java  | 30 ++----
 .../kylin/job/execution/ChainedExecutable.java  |  4 +-
 .../job/execution/DefaultChainedExecutable.java |  1 +
 .../kylin/job/manager/ExecutableManager.java    |  8 +-
 .../org/apache/kylin/source/ITableSource.java   | 24 +++++
 .../apache/kylin/source/hive/HiveMRInput.java   | 32 +++++++
 .../kylin/source/hive/HiveTableSource.java      | 36 +++++++
 .../java/org/apache/kylin/storage/IStorage.java | 28 ++++++
 .../kylin/storage/hbase/HBaseMROutput.java      | 32 +++++++
 .../kylin/storage/hbase/HBaseStorage.java       | 44 +++++++++
 .../kylin/query/enumerator/OLAPEnumerator.java  |  4 +-
 .../kylin/storage/ICachableStorageEngine.java   |  2 +-
 .../apache/kylin/storage/IStorageEngine.java    | 36 -------
 .../org/apache/kylin/storage/IStorageQuery.java | 36 +++++++
 .../kylin/storage/StorageEngineFactory.java     |  4 +-
 .../AbstractCacheFledgedStorageEngine.java      |  4 +-
 .../storage/hybrid/HybridStorageEngine.java     |  8 +-
 .../kylin/storage/test/ITStorageTest.java       |  4 +-
 29 files changed, 593 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/IBatchCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IBatchCubeBuilder.java b/job/src/main/java/org/apache/kylin/engine/IBatchCubeBuilder.java
new file mode 100644
index 0000000..2f5344a
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/IBatchCubeBuilder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.ChainedExecutable;
+
+public interface IBatchCubeBuilder {
+
+    /** Build a new cube segment, typically its time range appends to the end of current cube. */
+    public ChainedExecutable createBuildJob(CubeSegment newSegment);
+    
+    /** Merge multiple small segments into a big one. */
+    public ChainedExecutable createMergeJob(CubeSegment mergeSegment);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/IBuildEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IBuildEngine.java b/job/src/main/java/org/apache/kylin/engine/IBuildEngine.java
new file mode 100644
index 0000000..e54429f
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/IBuildEngine.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+public interface IBuildEngine {
+
+    public IBatchCubeBuilder createBatchCubeBuilder();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
new file mode 100644
index 0000000..3097a4b
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+public interface IMRInput {
+    
+    public IMRJobFlowParticipant createBuildFlowParticipant();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
new file mode 100644
index 0000000..6a94920
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.job.execution.AbstractExecutable;
+
+public interface IMRJobFlowParticipant {
+
+    public List<? extends AbstractExecutable> contributePhase1CreateFlatTable(List<? extends AbstractExecutable> steps);
+    
+    public List<? extends AbstractExecutable> contributePhase2CreateDictionary(List<? extends AbstractExecutable> steps);
+    
+    public List<? extends AbstractExecutable> contributePhase3BuildCube(List<? extends AbstractExecutable> steps);
+    
+    public List<? extends AbstractExecutable> contributePhase4UpdateCubeMetadata(List<? extends AbstractExecutable> steps);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
new file mode 100644
index 0000000..697957e
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+public interface IMROutput {
+
+    public IMRJobFlowParticipant createBuildFlowParticipant();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/mr/MRBuildEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBuildEngine.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBuildEngine.java
new file mode 100644
index 0000000..80b9009
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBuildEngine.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.engine.IBatchCubeBuilder;
+import org.apache.kylin.engine.IBuildEngine;
+
+public class MRBuildEngine implements IBuildEngine {
+
+    @Override
+    public IBatchCubeBuilder createBatchCubeBuilder() {
+        return new MRCubeBuilder();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/engine/mr/MRCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRCubeBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/MRCubeBuilder.java
new file mode 100644
index 0000000..6bf0423
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRCubeBuilder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubeBuilder;
+import org.apache.kylin.job.execution.ChainedExecutable;
+
+public class MRCubeBuilder implements IBatchCubeBuilder {
+
+    @Override
+    public ChainedExecutable createBuildJob(CubeSegment newSegment) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ChainedExecutable createMergeJob(CubeSegment mergeSegment) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /** Build a new segment and merge it with existing segment into one step. Deprecated, only needed by Holistic Distinct Count. */
+    public ChainedExecutable createBuildAndMergeJob(CubeSegment appendSegment, CubeSegment mergeSegment) {
+        return null;
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
index af6cb9c..f8eab6c 100644
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
@@ -53,6 +53,8 @@ public class MapReduceExecutable extends AbstractExecutable {
 
     private static final String KEY_MR_JOB = "MR_JOB_CLASS";
     private static final String KEY_PARAMS = "MR_JOB_PARAMS";
+    private static final String KEY_COUNTER_SAVEAS = "MR_COUNTER_SAVEAS";
+    
     public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
 
     public MapReduceExecutable() {
@@ -145,11 +147,8 @@ public class MapReduceExecutable extends AbstractExecutable {
                 status = newStatus;
                 executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
                 if (status.isComplete()) {
-                    hadoopCmdOutput.updateJobCounter();
                     final Map<String, String> info = hadoopCmdOutput.getInfo();
-                    info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
-                    info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
-                    info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
+                    readCounters(hadoopCmdOutput, info);
                     executableManager.addJobInfo(getId(), info);
 
                     if (status == JobStepStatusEnum.FINISHED) {
@@ -173,6 +172,27 @@ public class MapReduceExecutable extends AbstractExecutable {
         }
     }
 
+    private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
+        hadoopCmdOutput.updateJobCounter();
+        info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
+        info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
+        info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
+        
+        String saveAs = getParam(KEY_COUNTER_SAVEAS);
+        if (saveAs != null) {
+            String[] saveAsNames = saveAs.split(",");
+            saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
+            saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
+            saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
+        }
+    }
+
+    private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
+        if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
+            info.put(saveAsNames[i].trim(), counter);
+        }
+    }
+
     private String getRestStatusCheckUrl(Job job, KylinConfig config) {
         final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
         if (yarnStatusCheckUrl != null) {
@@ -216,5 +236,13 @@ public class MapReduceExecutable extends AbstractExecutable {
     public String getMapReduceParams() {
         return getParam(KEY_PARAMS);
     }
+    
+    public String getCounterSaveAs() {
+        return getParam(KEY_COUNTER_SAVEAS);
+    }
+    
+    public void setCounterSaveAs(String value) {
+        setParam(KEY_COUNTER_SAVEAS, value);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
index e8bfa68..be52040 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
@@ -39,14 +39,18 @@ import org.apache.kylin.job.execution.Output;
  */
 public class CubingJob extends DefaultChainedExecutable {
 
-    public CubingJob() {
-        super();
-    }
-
+    // KEYS of Output.extraInfo map, info passed across job steps
+    public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
+    public static final String SOURCE_SIZE_BYTES = "sourceSizeBytes";
+    public static final String CUBE_SIZE_BYTES = "byteSizeBytes";
+    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
+    
     private static final String CUBE_INSTANCE_NAME = "cubeName";
     private static final String SEGMENT_ID = "segmentId";
-    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
 
+    public CubingJob() {
+        super();
+    }
 
     void setCubeName(String name) {
         setParam(CUBE_INSTANCE_NAME, name);
@@ -136,4 +140,26 @@ public class CubingJob extends DefaultChainedExecutable {
     public void setMapReduceWaitTime(long t) {
         addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
     }
+    
+    public long findSourceRecordCount() {
+        return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0"));
+    }
+    
+    public long findSourceSizeBytes() {
+        return Long.parseLong(findExtraInfo(SOURCE_SIZE_BYTES, "0"));
+    }
+    
+    public long findCubeSizeBytes() {
+        return Long.parseLong(findExtraInfo(CUBE_SIZE_BYTES, "0"));
+    }
+    
+    private String findExtraInfo(String key, String dft) {
+        for (AbstractExecutable child : getTasks()) {
+            Output output = executableManager.getOutput(child.getId());
+            String value = output.getExtra().get(key);
+            if (value != null)
+                return value;
+        }
+        return dft;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 1e73436..21d1f8d 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -18,10 +18,13 @@
 
 package org.apache.kylin.job.cube;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
 import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.job.AbstractJobBuilder;
@@ -30,7 +33,12 @@ import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.hadoop.cube.*;
+import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
+import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
+import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
+import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
+import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
 import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
 import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
 import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
@@ -39,11 +47,8 @@ import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
 import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
 import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
 
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  */
@@ -65,22 +70,17 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
 
         // cubing
-        Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(seg, cuboidRootPath, result);
-        String intermediateHiveTableStepId = twoSteps.getFirst().getId();
-        String baseCuboidStepId = twoSteps.getSecond().getId();
-
+        addCubingSteps(seg, cuboidRootPath, result);
 
         if (this.inMemoryCubing()) {
-            result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, baseCuboidStepId, jobId));
+            result.addTask(createUpdateCubeInfoAfterBuildStep(seg, jobId));
         } else {
             // convert htable
-            AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, cuboidRootPath, result);
-
+            addHTableSteps(seg, cuboidRootPath, result);
             // update cube info
-            result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, convertCuboidToHfileStep.getId(), jobId));
+            result.addTask(createUpdateCubeInfoAfterBuildStep(seg, jobId));
         }
 
-
         final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
         final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId);
         result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable));
@@ -97,12 +97,10 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         final String mergedRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/cuboid/";
 
         // cubing the incremental segment
-        Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(appendSegment, appendRootPath, result);
-        final String intermediateHiveTableStepId = twoSteps.getFirst().getId();
-        final String baseCuboidStepId = twoSteps.getSecond().getId();
+        addCubingSteps(appendSegment, appendRootPath, result);
 
         // update the append segment info
-        result.addTask(createUpdateCubeInfoAfterBuildStep(appendSegment, intermediateHiveTableStepId, baseCuboidStepId, null, jobId));
+        result.addTask(createUpdateCubeInfoAfterBuildStep(appendSegment, jobId));
 
         List<CubeSegment> mergingSegments = mergeSegment.getCubeInstance().getMergingSegments(mergeSegment);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
@@ -118,23 +116,20 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
                 mergingCuboidPaths.add(getPathToMerge(merging));
         }
 
-        AbstractExecutable convertCuboidToHfileStep;
-
         if (this.inMemoryCubing()) {
             // merge from HTable
-            convertCuboidToHfileStep = addMergeFromHBaseSteps(mergeSegment, mergingSegmentIds, mergingHTables, mergedRootPath, result);
-
+            addMergeFromHBaseSteps(mergeSegment, mergingSegmentIds, mergingHTables, mergedRootPath, result);
             // bulk load step
             result.addTask(createBulkLoadStep(mergeSegment, result.getId()));
         } else {
             // merge cuboid
             addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result);
             // convert htable
-            convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result);
+            addHTableSteps(mergeSegment, mergedRootPath, result);
         }
 
         // update cube info
-        result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
+        result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, jobId));
 
         result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null));
 
@@ -159,23 +154,20 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
             mergingHTables.add(merging.getStorageLocationIdentifier());
         }
 
-
-        AbstractExecutable convertCuboidToHfileStep;
         if (this.inMemoryCubing()) {
             // merge from HTable
-            convertCuboidToHfileStep = addMergeFromHBaseSteps(seg, mergingSegmentIds, mergingHTables, mergedCuboidPath, result);
-
+            addMergeFromHBaseSteps(seg, mergingSegmentIds, mergingHTables, mergedCuboidPath, result);
             // bulk load step
             result.addTask(createBulkLoadStep(seg, result.getId()));
         } else {
             // merge cuboid
             addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result);
             // convert htable
-            convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result);
+            addHTableSteps(seg, mergedCuboidPath, result);
         }
 
         // update cube info
-        result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
+        result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, jobId));
 
         result.addTask(createGarbageCollectionStep(seg, mergingHTables, null));
 
@@ -191,7 +183,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
     }
 
 
-    AbstractExecutable addMergeFromHBaseSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingHTables, String mergedCuboidPath, CubingJob result) {
+    void addMergeFromHBaseSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingHTables, String mergedCuboidPath, CubingJob result) {
 
         result.addTask(createMergeDictionaryStep(seg, mergingSegmentIds));
 
@@ -203,13 +195,10 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         String formattedTables = StringUtils.join(mergingHTables, ",");
         String hFilePath = getHFilePath(seg, result.getId());
-        MapReduceExecutable writeHFileStep = createMergeCuboidDataFromHBaseStep(seg, formattedTables, hFilePath);
-        result.addTask(writeHFileStep);
-
-        return writeHFileStep;
+        result.addTask(createMergeCuboidDataFromHBaseStep(seg, formattedTables, hFilePath));
     }
 
-    Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
+    void addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
         final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
         final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
 
@@ -223,12 +212,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId);
         result.addTask(intermediateHiveTableStep);
         result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId));
-        MapReduceExecutable baseCuboidStep = null;
+        
         if (!inMemoryCubing()) {
             result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
             // base cuboid step
-            baseCuboidStep = createBaseCuboidStep(seg, intermediateHiveTableLocation, cuboidOutputTempPath);
-            result.addTask(baseCuboidStep);
+            result.addTask(createBaseCuboidStep(seg, intermediateHiveTableLocation, cuboidOutputTempPath));
 
             // n dim cuboid steps
             for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
@@ -240,16 +228,13 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
             result.addTask(createSaveStatisticsStep(seg, getStatisticsPath(seg, jobId)));
             result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
             result.addTask(createCreateHTableStep(seg));
-            baseCuboidStep = createInMemCubingStep(seg, intermediateHiveTableLocation, intermediateHiveTableName, cuboidOutputTempPath, result.getId());
-            result.addTask(baseCuboidStep);
+            result.addTask(createInMemCubingStep(seg, intermediateHiveTableLocation, intermediateHiveTableName, cuboidOutputTempPath, result.getId()));
             // bulk load step
             result.addTask(createBulkLoadStep(seg, result.getId()));
         }
-
-        return new Pair<AbstractExecutable, AbstractExecutable>(intermediateHiveTableStep, baseCuboidStep);
     }
 
-    AbstractExecutable addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
+    void addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
         final String jobId = result.getId();
         final String cuboidPath = cuboidRootPath + "*";
 
@@ -257,12 +242,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         // create htable step
         result.addTask(createCreateHTableStep(seg));
         // generate hfiles step
-        final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId);
-        result.addTask(convertCuboidToHfileStep);
+        result.addTask(createConvertCuboidToHfileStep(seg, cuboidPath, jobId));
         // bulk load step
         result.addTask(createBulkLoadStep(seg, jobId));
-
-        return convertCuboidToHfileStep;
     }
 
     private CubingJob initialJob(CubeSegment seg, String type) {
@@ -385,6 +367,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
+        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
         return baseCuboidStep;
     }
 
@@ -406,6 +389,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
+        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
         return baseCuboidStep;
     }
 
@@ -475,6 +459,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         createHFilesStep.setMapReduceParams(cmd.toString());
         createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
+        createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
 
         return createHFilesStep;
     }
@@ -495,14 +480,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
     }
 
-    private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(CubeSegment seg, String createFlatTableStepId, String baseCuboidStepId, String convertToHFileStepId, String jobId) {
+    private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(CubeSegment seg, String jobId) {
         final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
         updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
         updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
         updateCubeInfoStep.setSegmentId(seg.getUuid());
-        updateCubeInfoStep.setCreateFlatTableStepId(createFlatTableStepId);
-        updateCubeInfoStep.setBaseCuboidStepId(baseCuboidStepId);
-        updateCubeInfoStep.setConvertToHFileStepId(convertToHFileStepId);
         updateCubeInfoStep.setCubingJobId(jobId);
         return updateCubeInfoStep;
     }
@@ -570,16 +552,16 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         mergeCuboidDataStep.setMapReduceParams(cmd.toString());
         mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
+        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
         return mergeCuboidDataStep;
     }
 
-    private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(CubeSegment seg, List<String> mergingSegmentIds, String convertToHFileStepId, String jobId) {
+    private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(CubeSegment seg, List<String> mergingSegmentIds, String jobId) {
         UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
         result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
         result.setCubeName(seg.getCubeInstance().getName());
         result.setSegmentId(seg.getUuid());
         result.setMergingSegmentIds(mergingSegmentIds);
-        result.setConvertToHFileStepId(convertToHFileStepId);
         result.setCubingJobId(jobId);
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
index 617cdb5..8ad1f7a 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
@@ -18,21 +18,16 @@
 
 package org.apache.kylin.job.cube;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.cube.CubeInstance;
+import java.io.IOException;
+
 import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-
-import java.io.IOException;
 
 /**
  */
@@ -40,9 +35,6 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
 
     private static final String SEGMENT_ID = "segmentId";
     private static final String CUBE_NAME = "cubeName";
-    private static final String CONVERT_TO_HFILE_STEP_ID = "convertToHFileStepId";
-    private static final String BASE_CUBOID_STEP_ID = "baseCuboidStepId";
-    private static final String CREATE_FLAT_TABLE_STEP_ID = "createFlatTableStepId";
     private static final String CUBING_JOB_ID = "cubingJobId";
 
     public UpdateCubeInfoAfterBuildStep() {
@@ -65,26 +57,6 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         return getParam(SEGMENT_ID);
     }
 
-    public void setConvertToHFileStepId(String id) {
-        setParam(CONVERT_TO_HFILE_STEP_ID, id);
-    }
-
-    private String getConvertToHfileStepId() {
-        return getParam(CONVERT_TO_HFILE_STEP_ID);
-    }
-
-    public void setBaseCuboidStepId(String id) {
-        setParam(BASE_CUBOID_STEP_ID, id);
-    }
-
-    private String getBaseCuboidStepId() {
-        return getParam(BASE_CUBOID_STEP_ID);
-    }
-
-    public void setCreateFlatTableStepId(String id) {
-        setParam(CREATE_FLAT_TABLE_STEP_ID, id);
-    }
-
     public void setCubingJobId(String id) {
         setParam(CUBING_JOB_ID, id);
     }
@@ -98,32 +70,18 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
         final CubeInstance cube = cubeManager.getCube(getCubeName());
         final CubeSegment segment = cube.getSegmentById(getSegmentId());
-
-        Output baseCuboidOutput = executableManager.getOutput(getBaseCuboidStepId());
-        String sourceRecordsCount = baseCuboidOutput.getExtra().get(ExecutableConstants.SOURCE_RECORDS_COUNT);
-        Preconditions.checkState(StringUtils.isNotEmpty(sourceRecordsCount), "Can't get cube source record count.");
-        long sourceCount = Long.parseLong(sourceRecordsCount);
-
-        String sourceRecordsSize = baseCuboidOutput.getExtra().get(ExecutableConstants.SOURCE_RECORDS_SIZE);
-        Preconditions.checkState(StringUtils.isNotEmpty(sourceRecordsSize), "Can't get cube source record size.");
-        long sourceSize = Long.parseLong(sourceRecordsSize);
-
-        long size = 0;
-        boolean segmentReady = true;
-        if (!StringUtils.isBlank(getConvertToHfileStepId())) {
-            String cubeSizeString = executableManager.getOutput(getConvertToHfileStepId()).getExtra().get(ExecutableConstants.HDFS_BYTES_WRITTEN);
-            Preconditions.checkState(StringUtils.isNotEmpty(cubeSizeString), "Can't get cube segment size.");
-            size = Long.parseLong(cubeSizeString) / 1024;
-        } else {
-            // for the increment & merge case, the increment segment is only built to be merged, won't serve query by itself
-            segmentReady = false;
-        }
+        
+        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
+        long sourceCount = cubingJob.findSourceRecordCount();
+        long sourceSizeBytes = cubingJob.findSourceSizeBytes();
+        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
+        boolean segmentReady = cubeSizeBytes > 0; // for build+merge scenario, convert HFile not happen yet, so cube size is 0
 
         segment.setLastBuildJobID(getCubingJobId());
         segment.setLastBuildTime(System.currentTimeMillis());
-        segment.setSizeKB(size);
+        segment.setSizeKB(cubeSizeBytes / 1024);
         segment.setInputRecords(sourceCount);
-        segment.setInputRecordsSize(sourceSize);
+        segment.setInputRecordsSize(sourceSizeBytes);
 
         try {
             if (segmentReady) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
index d50bd6a..47e5a31 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
@@ -18,23 +18,22 @@
 
 package org.apache.kylin.job.cube;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.collect.Lists;
 
 /**
  */
@@ -43,7 +42,6 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
     private static final String CUBE_NAME = "cubeName";
     private static final String SEGMENT_ID = "segmentId";
     private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-    private static final String CONVERT_TO_HFILE_STEP_ID = "convertToHFileStepId";
     private static final String CUBING_JOB_ID = "cubingJobId";
 
     private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -60,9 +58,9 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         if (mergedSegment == null) {
             return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
         }
-        String cubeSizeString = executableManager.getOutput(getConvertToHfileStepId()).getExtra().get(ExecutableConstants.HDFS_BYTES_WRITTEN);
-        Preconditions.checkState(StringUtils.isNotEmpty(cubeSizeString), "Can't get cube segment size.");
-        long cubeSize = Long.parseLong(cubeSizeString) / 1024;
+        
+        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
+        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
 
         // collect source statistics
         List<String> mergingSegmentIds = getMergingSegmentIds();
@@ -78,7 +76,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         }
 
         // update segment info
-        mergedSegment.setSizeKB(cubeSize);
+        mergedSegment.setSizeKB(cubeSizeBytes / 1024);
         mergedSegment.setInputRecords(sourceCount);
         mergedSegment.setInputRecordsSize(sourceSize);
         mergedSegment.setLastBuildJobID(getCubingJobId());
@@ -127,14 +125,6 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         }
     }
 
-    public void setConvertToHFileStepId(String id) {
-        setParam(CONVERT_TO_HFILE_STEP_ID, id);
-    }
-
-    private String getConvertToHfileStepId() {
-        return getParam(CONVERT_TO_HFILE_STEP_ID);
-    }
-
     public void setCubingJobId(String id) {
         setParam(CUBING_JOB_ID, id);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
index f352ead..d5a7aae 100644
--- a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
@@ -24,6 +24,8 @@ import java.util.List;
  */
 public interface ChainedExecutable extends Executable {
 
-    List<? extends Executable> getTasks();
+    List<? extends AbstractExecutable> getTasks();
+    
+    void addTask(AbstractExecutable executable);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 19b0d74..6443762 100644
--- a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -114,6 +114,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
         return null;
     }
 
+    @Override
     public void addTask(AbstractExecutable executable) {
         executable.setId(getId() + "-" + String.format("%02d", subTasks.size()));
         this.subTasks.add(executable);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
index a2b310b..05f8c8e 100644
--- a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ b/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
@@ -280,9 +280,9 @@ public class ExecutableManager {
         result.setUuid(executable.getId());
         result.setType(executable.getClass().getName());
         result.setParams(executable.getParams());
-        if (executable instanceof DefaultChainedExecutable) {
+        if (executable instanceof ChainedExecutable) {
             List<ExecutablePO> tasks = Lists.newArrayList();
-            for (AbstractExecutable task : ((DefaultChainedExecutable) executable).getTasks()) {
+            for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) {
                 tasks.add(parse(task));
             }
             result.setTasks(tasks);
@@ -304,9 +304,9 @@ public class ExecutableManager {
             result.setParams(executablePO.getParams());
             List<ExecutablePO> tasks = executablePO.getTasks();
             if (tasks != null && !tasks.isEmpty()) {
-                Preconditions.checkArgument(result instanceof DefaultChainedExecutable);
+                Preconditions.checkArgument(result instanceof ChainedExecutable);
                 for (ExecutablePO subTask: tasks) {
-                    ((DefaultChainedExecutable) result).addTask(parseTo(subTask));
+                    ((ChainedExecutable) result).addTask(parseTo(subTask));
                 }
             }
             return result;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/source/ITableSource.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/ITableSource.java b/job/src/main/java/org/apache/kylin/source/ITableSource.java
new file mode 100644
index 0000000..ae1dccc
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/ITableSource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+public interface ITableSource {
+
+    public <I> I adaptToBuildEngine(Class<I> engineInterface);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
new file mode 100644
index 0000000..a0cd62e
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import org.apache.kylin.engine.mr.IMRJobFlowParticipant;
+import org.apache.kylin.engine.mr.IMRInput;
+
+public class HiveMRInput implements IMRInput {
+
+    @Override
+    public IMRJobFlowParticipant createBuildFlowParticipant() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
new file mode 100644
index 0000000..4265519
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.source.ITableSource;
+
+public class HiveTableSource implements ITableSource {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+        if (engineInterface == IMRInput.class) {
+            return (I) new HiveMRInput();
+        } else {
+            throw new RuntimeException("Cannot adapt to " + engineInterface);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/storage/IStorage.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/IStorage.java b/job/src/main/java/org/apache/kylin/storage/IStorage.java
new file mode 100644
index 0000000..89b96e9
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/IStorage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.metadata.realization.IRealization;
+
+public interface IStorage {
+    
+    public IStorageQuery createStorageQuery(IRealization realization);
+
+    public <I> I adaptToBuildEngine(Class<I> engineInterface);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
new file mode 100644
index 0000000..00c2eef
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.kylin.engine.mr.IMRJobFlowParticipant;
+import org.apache.kylin.engine.mr.IMROutput;
+
+public class HBaseMROutput implements IMROutput {
+
+    @Override
+    public IMRJobFlowParticipant createBuildFlowParticipant() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/job/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
new file mode 100644
index 0000000..7951b44
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.kylin.engine.mr.IMROutput;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.storage.IStorage;
+import org.apache.kylin.storage.IStorageQuery;
+
+public class HBaseStorage implements IStorage {
+
+    @Override
+    public IStorageQuery createStorageQuery(IRealization realization) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+        if (engineInterface == IMROutput.class) {
+            return (I) new HBaseMROutput();
+        } else {
+            throw new RuntimeException("Cannot adapt to " + engineInterface);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
index eaf668e..e938cbd 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
@@ -31,7 +31,7 @@ import org.apache.kylin.metadata.filter.TupleFilterSerializer;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.StorageEngineFactory;
 import org.apache.kylin.storage.hbase.coprocessor.DictCodeSystem;
 import org.slf4j.Logger;
@@ -111,7 +111,7 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
         olapContext.resetSQLDigest();
 
         // query storage engine
-        IStorageEngine storageEngine = StorageEngineFactory.getStorageEngine(olapContext.realization);
+        IStorageQuery storageEngine = StorageEngineFactory.getStorageEngine(olapContext.realization);
         ITupleIterator iterator = storageEngine.search(olapContext.storageContext, olapContext.getSQLDigest(), olapContext.returnTupleInfo);
         if (logger.isDebugEnabled()) {
             logger.debug("return TupleIterator...");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
index 90c5c95..1fcd9b3 100644
--- a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
@@ -4,7 +4,7 @@ import com.google.common.collect.Range;
 
 /**
  */
-public interface ICachableStorageEngine extends IStorageEngine {
+public interface ICachableStorageEngine extends IStorageQuery {
     /**
      *
      * being dynamic => getVolatilePeriod() return not null

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java
deleted file mode 100644
index 46a94ca..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.tuple.TupleInfo;
-
-/**
- * 
- * @author xjiang
- * 
- */
-public interface IStorageEngine {
-
-    ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo);
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
new file mode 100644
index 0000000..f090ebb
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.tuple.TupleInfo;
+
+/**
+ * 
+ * @author xjiang
+ * 
+ */
+public interface IStorageQuery {
+
+    ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo);
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
index 0d720ab..3afcd32 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
@@ -42,7 +42,7 @@ import com.google.common.base.Preconditions;
 public class StorageEngineFactory {
     private static boolean allowStorageLayerCache = true;
 
-    public static IStorageEngine getStorageEngine(IRealization realization) {
+    public static IStorageQuery getStorageEngine(IRealization realization) {
 
         if (realization.getType() == RealizationType.INVERTED_INDEX) {
             ICachableStorageEngine ret = new InvertedIndexStorageEngine((IIInstance) realization);
@@ -63,7 +63,7 @@ public class StorageEngineFactory {
         }
     }
 
-    private static IStorageEngine wrapWithCache(ICachableStorageEngine underlyingStorageEngine, IRealization realization) {
+    private static IStorageQuery wrapWithCache(ICachableStorageEngine underlyingStorageEngine, IRealization realization) {
         if (underlyingStorageEngine.isDynamic()) {
             return new CacheFledgedDynamicStorageEngine(underlyingStorageEngine, getPartitionCol(realization));
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
index 422e335..34a8066 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
@@ -11,13 +11,13 @@ import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
 import org.apache.kylin.metadata.realization.StreamSQLDigest;
 import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
 import org.apache.kylin.storage.ICachableStorageEngine;
-import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.IStorageQuery;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  */
-public abstract class AbstractCacheFledgedStorageEngine implements IStorageEngine, TeeTupleItrListener {
+public abstract class AbstractCacheFledgedStorageEngine implements IStorageQuery, TeeTupleItrListener {
     private static final Logger logger = LoggerFactory.getLogger(AbstractCacheFledgedStorageEngine.class);
     private static final String storageCacheTemplate = "StorageCache";
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
index 3698688..3243698 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
@@ -5,7 +5,7 @@ import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.StorageEngineFactory;
 import org.apache.kylin.storage.tuple.TupleInfo;
@@ -14,14 +14,14 @@ import java.util.List;
 
 /**
  */
-public class HybridStorageEngine implements IStorageEngine {
+public class HybridStorageEngine implements IStorageQuery {
 
     private IRealization[] realizations;
-    private IStorageEngine[] storageEngines;
+    private IStorageQuery[] storageEngines;
 
     public HybridStorageEngine(HybridInstance hybridInstance) {
         this.realizations = hybridInstance.getRealizations();
-        storageEngines = new IStorageEngine[realizations.length];
+        storageEngines = new IStorageQuery[realizations.length];
         for (int i = 0; i < realizations.length; i++) {
             storageEngines[i] = StorageEngineFactory.getStorageEngine(realizations[i]);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e38ddb78/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
index 1e32ef5..2d75535 100644
--- a/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.StorageEngineFactory;
 import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
@@ -41,7 +41,7 @@ import static org.junit.Assert.assertTrue;
 
 public class ITStorageTest extends HBaseMetadataTestCase {
 
-    private IStorageEngine storageEngine;
+    private IStorageQuery storageEngine;
     private CubeInstance cube;
     private StorageContext context;