You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/07/02 13:04:54 UTC

incubator-kylin git commit: KYLIN-867 initial commit, backport Hybrid model to 0.7

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-867 [created] c87aa4a3b


KYLIN-867 initial commit, backport Hybrid model to 0.7

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c87aa4a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c87aa4a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c87aa4a3

Branch: refs/heads/KYLIN-867
Commit: c87aa4a3b80e342dee39ad0827e2547a1f475891
Parents: e7fcc20
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jul 2 19:04:25 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Jul 2 19:04:25 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/lock/JobLock.java   |   9 +
 .../apache/kylin/common/lock/MockJobLock.java   |  15 ++
 .../kylin/common/lock/ZookeeperJobLock.java     |  83 ++++++++
 .../kylin/common/persistence/ResourceStore.java |   1 +
 .../kylin/common/restclient/Broadcaster.java    |   2 +-
 .../org/apache/kylin/cube/CubeInstance.java     |  39 +++-
 .../apache/kylin/cube/cuboid/CuboidTest.java    |   1 -
 .../hybrid/test_kylin_hybrid_inner_join.json    |  14 ++
 .../hybrid/test_kylin_hybrid_left_join.json     |  14 ++
 .../invertedindex_desc/test_kylin_ii_desc.json  |   7 +-
 .../localmeta/project/default.json              |   6 -
 .../localmeta/project/onlyinner.json            |   4 -
 .../localmeta/project/onlyleft.json             |   4 -
 .../apache/kylin/invertedindex/IIInstance.java  |  36 ++++
 .../kylin/invertedindex/index/SliceBuilder.java |   3 -
 .../kylin/invertedindex/model/IIDesc.java       |  27 ++-
 .../kylin/invertedindex/model/IIDimension.java  |   9 +-
 .../invertedindex/IIDescManagerTest.java        |   2 +-
 .../invertedindex/IIInstanceTest.java           |   4 +-
 .../invertedindex/InvertedIndexLocalTest.java   |   6 +-
 .../java/org/apache/kylin/job/Scheduler.java    |   3 +-
 .../job/impl/threadpool/DefaultScheduler.java   |  49 +----
 .../kylin/job/BuildCubeWithEngineTest.java      |   5 +-
 .../apache/kylin/job/BuildIIWithEngineTest.java |   5 +-
 .../job/impl/threadpool/BaseSchedulerTest.java  |   3 +-
 .../apache/kylin/metadata/model/TblColRef.java  |   4 +
 .../metadata/realization/IRealization.java      |   8 +
 .../realization/RealizationRegistry.java        |   1 +
 .../metadata/realization/RealizationType.java   |   2 +-
 .../metadata/realization/SQLDigestUtil.java     | 114 +++++++++++
 .../metadata/tuple/CompoundTupleIterator.java   |  45 +++++
 .../kylin/metadata/tuple/ITupleIterator.java    |   8 +-
 .../apache/kylin/query/routing/RoutingRule.java |   4 +-
 .../AdjustForWeeklyMatchedRealization.java      |  25 ++-
 .../RoutingRules/RealizationPriorityRule.java   |   5 +-
 .../RoutingRules/RealizationSortRule.java       |  66 +++++++
 .../apache/kylin/query/test/IIQueryTest.java    |   3 +
 .../kylin/rest/controller/JobController.java    |   3 +-
 .../kylin/storage/StorageEngineFactory.java     |   6 +-
 .../storage/hbase/CubeSegmentTupleIterator.java |   5 +
 .../hbase/SerializedHBaseTupleIterator.java     |   5 +
 .../endpoint/EndpointTupleIterator.java         |  10 +
 .../kylin/storage/hybrid/HybridInstance.java    | 188 +++++++++++++++++++
 .../kylin/storage/hybrid/HybridManager.java     | 127 +++++++++++++
 .../storage/hybrid/HybridStorageEngine.java     |  69 +++++++
 .../endpoint/EndpointAggregationTest.java       |   1 +
 .../endpoint/TableRecordInfoTest.java           |   1 +
 .../kylin/storage/hybrid/HybridManagerTest.java |  45 +++++
 48 files changed, 979 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/common/src/main/java/org/apache/kylin/common/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/lock/JobLock.java b/common/src/main/java/org/apache/kylin/common/lock/JobLock.java
new file mode 100644
index 0000000..7fdb64c
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/lock/JobLock.java
@@ -0,0 +1,9 @@
+package org.apache.kylin.common.lock;
+
+/**
+ */
+public interface JobLock {
+    boolean lock();
+
+    void unlock();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java b/common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
new file mode 100644
index 0000000..230d4d8
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
@@ -0,0 +1,15 @@
+package org.apache.kylin.common.lock;
+
+/**
+ */
+public class MockJobLock implements JobLock {
+    @Override
+    public boolean lock() {
+        return true;
+    }
+
+    @Override
+    public void unlock() {
+        return;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java b/common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
new file mode 100644
index 0000000..603894c
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
@@ -0,0 +1,83 @@
+package org.apache.kylin.common.lock;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class ZookeeperJobLock implements JobLock {
+    private Logger logger = LoggerFactory.getLogger(ZookeeperJobLock.class);
+
+    private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+
+    private String scheduleID;
+    private InterProcessMutex sharedLock;
+    private CuratorFramework zkClient;
+
+    @Override
+    public boolean lock() {
+        this.scheduleID = schedulerId();
+        String ZKConnectString = getZKConnectString();
+        if (StringUtils.isEmpty(ZKConnectString)) {
+            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+        }
+
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        this.zkClient = CuratorFrameworkFactory.newClient(ZKConnectString, retryPolicy);
+        this.zkClient.start();
+        this.sharedLock = new InterProcessMutex(zkClient, this.scheduleID);
+        boolean hasLock = false;
+        try {
+            hasLock = sharedLock.acquire(3, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            logger.warn("error acquire lock", e);
+        }
+        if (!hasLock) {
+            logger.warn("fail to acquire lock, scheduler has not been started");
+            zkClient.close();
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void unlock() {
+        releaseLock();
+    }
+
+    private String getZKConnectString() {
+        Configuration conf = HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl());
+        return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    }
+
+    private void releaseLock() {
+        try {
+            if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
+                // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
+                if (zkClient.checkExists().forPath(scheduleID) != null) {
+                    zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(scheduleID);
+                }
+            }
+        } catch (Exception e) {
+            logger.error("error release lock:" + scheduleID);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private String schedulerId() {
+        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 7021915..a49706d 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -51,6 +51,7 @@ abstract public class ResourceStore {
     public static final String SNAPSHOT_RESOURCE_ROOT = "/table_snapshot";
     public static final String TABLE_EXD_RESOURCE_ROOT = "/table_exd";
     public static final String TABLE_RESOURCE_ROOT = "/table";
+    public static final String HYBRID_RESOURCE_ROOT = "/hybrid";
 
 
     private static ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
index b636e89..68e68c3 100644
--- a/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ b/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
@@ -139,7 +139,7 @@ public class Broadcaster {
     }
 
     public static enum TYPE {
-        ALL("all"), CUBE("cube"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model");
+        ALL("all"), CUBE("cube"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
         private String text;
 
         private TYPE(String text) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index d978887..898925a 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -327,7 +327,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
 
     @Override
     public int getCost(SQLDigest digest) {
-        return 0;
+        return cost;
     }
 
     @Override
@@ -348,4 +348,41 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
         this.projectName = projectName;
     }
 
+
+    @Override
+    public long getDateRangeStart() {
+        List<CubeSegment> readySegs = getSegments(SegmentStatusEnum.READY);
+
+        long startTime = Long.MAX_VALUE;
+        for (CubeSegment seg : readySegs) {
+            if (seg.getDateRangeStart() < startTime)
+                startTime = seg.getDateRangeStart();
+        }
+
+        return startTime;
+    }
+
+    @Override
+    public long getDateRangeEnd() {
+
+        List<CubeSegment> readySegs = getSegments(SegmentStatusEnum.READY);
+
+        long endTime = 0;
+        for (CubeSegment seg : readySegs) {
+            if (seg.getDateRangeEnd() > endTime)
+                endTime = seg.getDateRangeEnd();
+        }
+
+        return endTime;
+    }
+
+    @Override
+    public String getModelName() {
+        return this.getDescriptor().getModelName();
+    }
+
+    @Override
+    public List<TblColRef> getAllDimensions() {
+        return Lists.newArrayList(getDescriptor().listDimensionColumnsIncludingDerived());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java b/cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
index 9f7cbc6..4914448 100644
--- a/cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
@@ -173,7 +173,6 @@ public class CuboidTest extends LocalFileMetadataTestCase {
         assertEquals(toLong("111111111"), cuboid.getId());
     }
 
-    //@Test
     public void testII() {
         CubeDesc cube = getTestKylinCubeII();
         assertEquals(toLong("111111111"), Cuboid.getBaseCuboidId(cube));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_inner_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_inner_join.json b/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_inner_join.json
new file mode 100644
index 0000000..5582841
--- /dev/null
+++ b/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_inner_join.json
@@ -0,0 +1,14 @@
+{
+  "uuid": "9iiu8590-64b6-4367-8fb5-7500eb95fd9c",
+  "name": "test_kylin_hybrid_inner_join",
+  "historyRealization": {
+    "type": "CUBE",
+    "realization": "test_kylin_cube_without_slr_empty"
+  },
+  "realTimeRealization": {
+    "type": "INVERTED_INDEX",
+    "realization": "test_kylin_ii"
+  },
+  "last_modified": 1420016227424,
+  "create_time": null
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_left_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_left_join.json b/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_left_join.json
new file mode 100644
index 0000000..37f59d6
--- /dev/null
+++ b/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_left_join.json
@@ -0,0 +1,14 @@
+{
+  "uuid": "5ca78590-64b6-4367-8fb5-7500eb95fd9c",
+  "name": "test_kylin_hybrid_left_join",
+  "historyRealization": {
+    "type": "CUBE",
+    "realization": "test_kylin_cube_with_slr_left_join_empty"
+  },
+  "realTimeRealization": {
+    "type": "INVERTED_INDEX",
+    "realization": "test_kylin_ii"
+  },
+  "last_modified": 1420016227424,
+  "create_time": null
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_desc.json b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_desc.json
index 962d887..a5ee5e5 100644
--- a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_desc.json
+++ b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_desc.json
@@ -1,12 +1,11 @@
 {
   "uuid": "74bf87b5-c7b5-4420-a12a-07f6b37b3187",
-
   "last_modified": 0,
   "name": "test_kylin_ii_desc",
   "fact_table": "default.test_kylin_fact",
   "model_name": "test_kylin_ii_model_desc",
   "timestamp_dimension": "cal_dt",
-  "bitmap_dimensions": [
+  "value_dimensions": [
     {
       "table": "default.test_kylin_fact",
       "columns": [
@@ -43,9 +42,7 @@
         "seller_type_cd",
         "seller_type_desc"
       ]
-    }
-  ],
-  "value_dimensions": [
+    },
     {
       "table": "edw.test_cal_dt",
       "columns": [

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/project/default.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/project/default.json b/examples/test_case_data/localmeta/project/default.json
index c6b8b74..853a10a 100644
--- a/examples/test_case_data/localmeta/project/default.json
+++ b/examples/test_case_data/localmeta/project/default.json
@@ -27,11 +27,5 @@
       "type": "INVERTED_INDEX",
       "realization": "test_kylin_ii"
     }
-  ],
-  "cubes": [
-    "test_kylin_cube_with_slr_empty",
-    "test_kylin_cube_without_slr_empty",
-    "test_kylin_cube_with_slr_left_join_empty",
-    "test_kylin_cube_without_slr_left_join_empty"
   ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/project/onlyinner.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/project/onlyinner.json b/examples/test_case_data/localmeta/project/onlyinner.json
index b98d762..a3ab387 100644
--- a/examples/test_case_data/localmeta/project/onlyinner.json
+++ b/examples/test_case_data/localmeta/project/onlyinner.json
@@ -12,9 +12,5 @@
       "type": "CUBE",
       "realization": "test_kylin_cube_without_slr_empty"
     }
-  ],
-  "cubes": [
-    "test_kylin_cube_with_slr_empty",
-    "test_kylin_cube_without_slr_empty"
   ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/project/onlyleft.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/project/onlyleft.json b/examples/test_case_data/localmeta/project/onlyleft.json
index 9569eea..014ff4b 100644
--- a/examples/test_case_data/localmeta/project/onlyleft.json
+++ b/examples/test_case_data/localmeta/project/onlyleft.json
@@ -12,9 +12,5 @@
       "type": "CUBE",
       "realization": "test_kylin_cube_without_slr_left_join_empty"
     }
-  ],
-  "cubes": [
-    "test_kylin_cube_with_slr_left_join_empty",
-    "test_kylin_cube_without_slr_left_join_empty"
   ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index eeb06ed..9c46fbb 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -310,4 +310,40 @@ public class IIInstance extends RootPersistentEntity implements IRealization {
         this.projectName = projectName;
     }
 
+    @Override
+    public long getDateRangeStart() {
+        List<IISegment> readySegs = getSegments(SegmentStatusEnum.READY);
+
+        long startTime = Long.MAX_VALUE;
+        for (IISegment seg : readySegs) {
+            if (seg.getDateRangeStart() < startTime)
+                startTime = seg.getDateRangeStart();
+        }
+
+        return startTime;
+    }
+
+    @Override
+    public long getDateRangeEnd() {
+
+        List<IISegment> readySegs = getSegments(SegmentStatusEnum.READY);
+
+        long endTime = 0;
+        for (IISegment seg : readySegs) {
+            if (seg.getDateRangeEnd() > endTime)
+                endTime = seg.getDateRangeEnd();
+        }
+
+        return endTime;
+    }
+
+    @Override
+    public String getModelName() {
+        return this.getDescriptor().getModelName();
+    }
+
+    @Override
+    public List<TblColRef> getAllDimensions() {
+        return getDescriptor().listAllDimensions();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
index e217df0..72050a1 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
@@ -61,9 +61,6 @@ public class SliceBuilder {
 		// reset for next slice
 		nRecords = 0;
 		containers = new ColumnValueContainer[nColumns];
-		for (int i : info.getDescriptor().getBitmapColumns()) {
-			containers[i] = new BitMapContainer(info.getDigest(), i);
-		}
 		for (int i : info.getDescriptor().getValueColumns()) {
 			containers[i] = new CompressedValueContainer(info.getDigest(), i,
 					nRecordsCap);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index 75cea57..fca2b93 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -66,8 +66,6 @@ public class IIDesc extends RootPersistentEntity {
     private String modelName;
     @JsonProperty("timestamp_dimension")
     private String timestampDimension;
-    @JsonProperty("bitmap_dimensions")
-    private List<IIDimension> bitmapDimensions;
     @JsonProperty("value_dimensions")
     private List<IIDimension> valueDimensions;
     @JsonProperty("metrics")
@@ -82,8 +80,8 @@ public class IIDesc extends RootPersistentEntity {
     // computed
     private List<TableDesc> allTables = Lists.newArrayList();
     private List<TblColRef> allColumns = Lists.newArrayList();
+    private List<TblColRef> allDimensions = Lists.newArrayList();
     private int tsCol;
-    private int[] bitmapCols;
     private int[] valueCols;
     private int[] metricsCols;
     private BitSet metricsColSet;
@@ -106,7 +104,6 @@ public class IIDesc extends RootPersistentEntity {
         timestampDimension = timestampDimension.toUpperCase();
 
         // capitalize
-        IIDimension.capicalizeStrings(bitmapDimensions);
         IIDimension.capicalizeStrings(valueDimensions);
         StringUtil.toUpperCaseArray(metricNames, metricNames);
 
@@ -114,11 +111,13 @@ public class IIDesc extends RootPersistentEntity {
         HashSet<String> allTableNames = Sets.newHashSet();
         measureDescs = Lists.newArrayList();
         measureDescs.add(makeCountMeasure());
-        for (IIDimension iiDimension : Iterables.concat(bitmapDimensions, valueDimensions)) {
+        for (IIDimension iiDimension : valueDimensions) {
             TableDesc tableDesc = this.getTableDesc(iiDimension.getTable());
             for (String column : iiDimension.getColumns()) {
                 ColumnDesc columnDesc = tableDesc.findColumnByName(column);
-                allColumns.add(new TblColRef(columnDesc));
+                TblColRef tcr = new TblColRef(columnDesc);
+                allColumns.add(tcr);
+                allDimensions.add(tcr);
                 measureDescs.add(makeHLLMeasure(columnDesc, null));
             }
 
@@ -141,15 +140,11 @@ public class IIDesc extends RootPersistentEntity {
         }
 
         // indexing for each type of columns
-        bitmapCols = new int[IIDimension.getColumnCount(bitmapDimensions)];
         valueCols = new int[IIDimension.getColumnCount(valueDimensions)];
         metricsCols = new int[metricNames.length];
         metricsColSet = new BitSet(this.getTableDesc(this.getFactTableName()).getColumnCount());
 
         int totalIndex = 0;
-        for (int i = 0; i < bitmapCols.length; ++i, ++totalIndex) {
-            bitmapCols[i] = totalIndex;
-        }
         for (int i = 0; i < valueCols.length; ++i, ++totalIndex) {
             valueCols[i] = totalIndex;
         }
@@ -169,7 +164,7 @@ public class IIDesc extends RootPersistentEntity {
             }
         }
         if (tsCol < 0)
-            throw new RuntimeException("timestamp_dimension is not in bitmapDimensions or valueDimensions");
+            throw new RuntimeException("timestamp_dimension is not in valueDimensions");
     }
 
     private TableDesc getTableDesc(String tableName) {
@@ -257,6 +252,10 @@ public class IIDesc extends RootPersistentEntity {
         return allColumns;
     }
 
+    public List<TblColRef> listAllDimensions() {
+        return allDimensions;
+    }
+
     public TblColRef findColumnRef(String table, String column) {
         ColumnDesc columnDesc = this.getTableDesc(table).findColumnByName(column);
         return new TblColRef(columnDesc);
@@ -294,10 +293,6 @@ public class IIDesc extends RootPersistentEntity {
         return tsCol;
     }
 
-    public int[] getBitmapColumns() {
-        return bitmapCols;
-    }
-
     public int[] getValueColumns() {
         return valueCols;
     }
@@ -354,7 +349,7 @@ public class IIDesc extends RootPersistentEntity {
         try {
             md = MessageDigest.getInstance("MD5");
             StringBuilder sigString = new StringBuilder();
-            sigString.append(this.name).append("|").append(this.getFactTableName()).append("|").append(timestampDimension).append("|").append(JsonUtil.writeValueAsString(this.bitmapDimensions)).append("|").append(JsonUtil.writeValueAsString(valueDimensions)).append("|").append(JsonUtil.writeValueAsString(this.metricNames)).append("|").append(sharding).append("|").append(sliceSize);
+            sigString.append(this.name).append("|").append(this.getFactTableName()).append("|").append(timestampDimension).append("|").append("|").append(JsonUtil.writeValueAsString(valueDimensions)).append("|").append(JsonUtil.writeValueAsString(this.metricNames)).append("|").append(sharding).append("|").append(sliceSize);
 
             byte[] signature = md.digest(sigString.toString().getBytes());
             return new String(Base64.encodeBase64(signature));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDimension.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDimension.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDimension.java
index ee8b6b9..d2d377d 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDimension.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDimension.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.invertedindex.model;
 
+import java.util.Arrays;
 import java.util.List;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -52,9 +53,11 @@ public class IIDimension {
 
 
     public static void capicalizeStrings(List<IIDimension> dimensions) {
-        for (IIDimension iiDimension : dimensions) {
-            iiDimension.setTable(iiDimension.getTable().toUpperCase());
-            StringUtil.toUpperCaseArray(iiDimension.getColumns(), iiDimension.getColumns());
+        if (dimensions != null) {
+            for (IIDimension iiDimension : dimensions) {
+                iiDimension.setTable(iiDimension.getTable().toUpperCase());
+                StringUtil.toUpperCaseArray(iiDimension.getColumns(), iiDimension.getColumns());
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java
index 5782a82..bd01b2c 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java
@@ -54,7 +54,7 @@ public class IIDescManagerTest extends LocalFileMetadataTestCase {
     public void testCRUD() throws IOException {
         IIDescManager mgr = IIDescManager.getInstance(getTestConfig());
 
-        String newDescName = "Copy of " + TEST_II_DESC_NAME;
+        String newDescName = "Copy_of_" + TEST_II_DESC_NAME;
 
         try {
             IIDesc testRecord = mgr.getIIDesc(newDescName);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
index 792d674..4eb11be 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
@@ -70,7 +70,7 @@ public class IIInstanceTest extends LocalFileMetadataTestCase {
 
         IIDesc iiDesc = IIDescManager.getInstance(getTestConfig()).getIIDesc("test_kylin_ii_desc");
 
-        IIInstance ii = IIInstance.create("new ii", "default", iiDesc);
+        IIInstance ii = IIInstance.create("new_ii", "default", iiDesc);
 
         IIManager iiMgr = IIManager.getInstance(getTestConfig());
 
@@ -78,7 +78,7 @@ public class IIInstanceTest extends LocalFileMetadataTestCase {
 
         iiMgr.createII(ii);
 
-        Assert.assertNotNull(iiMgr.getII("new ii"));
+        Assert.assertNotNull(iiMgr.getII("new_ii"));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
index 42de647..45eb9b5 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.util.Pair;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
@@ -62,6 +63,7 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
 	}
 
 	@Test
+	@Ignore
 	public void testBitMapContainer() {
 		// create container
 		BitMapContainer container = new BitMapContainer(info.getDigest(), 0);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/Scheduler.java b/job/src/main/java/org/apache/kylin/job/Scheduler.java
index 120b86f..87ce072 100644
--- a/job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ b/job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.job;
 
+import org.apache.kylin.common.lock.JobLock;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.Executable;
@@ -27,7 +28,7 @@ import org.apache.kylin.job.execution.Executable;
  */
 public interface Scheduler<T extends Executable> {
 
-    void init(JobEngineConfig jobEngineConfig) throws SchedulerException;
+    void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException;
 
     void shutdown() throws SchedulerException;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 6483035..d69e74d 100644
--- a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -26,6 +26,7 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.kylin.common.lock.JobLock;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.RetryPolicy;
@@ -143,29 +144,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
         }
     }
 
-    private void releaseLock() {
-        try {
-            if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
-                // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
-                if (zkClient.checkExists().forPath(schedulerId()) != null) {
-                    zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(schedulerId());
-                }
-            }
-        } catch (Exception e) {
-            logger.error("error release lock:" + schedulerId());
-            throw new RuntimeException(e);
-        }
-    }
-
-    private String schedulerId() {
-        return ZOOKEEPER_LOCK_PATH + "/" + jobEngineConfig.getConfig().getMetadataUrlPrefix();
-    }
-
-    private String getZKConnectString(JobEngineConfig context) {
-        Configuration conf = HadoopUtil.newHBaseConfiguration(context.getConfig().getStorageUrl());
-        return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-    }
-
     public static DefaultScheduler getInstance() {
         return INSTANCE;
     }
@@ -182,33 +160,16 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
     }
 
     @Override
-    public synchronized void init(JobEngineConfig jobEngineConfig) throws SchedulerException {
+    public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
         if (!initialized) {
             initialized = true;
         } else {
             return;
         }
-        String ZKConnectString = getZKConnectString(jobEngineConfig);
-        if (StringUtils.isEmpty(ZKConnectString)) {
-            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
-        }
 
         this.jobEngineConfig = jobEngineConfig;
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-        this.zkClient = CuratorFrameworkFactory.newClient(ZKConnectString, retryPolicy);
-        this.zkClient.start();
-        this.sharedLock = new InterProcessMutex(zkClient, schedulerId());
-        boolean hasLock = false;
-        try {
-            hasLock = sharedLock.acquire(3, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            logger.warn("error acquire lock", e);
-        }
-        if (!hasLock) {
-            logger.warn("fail to acquire lock, scheduler has not been started");
-            zkClient.close();
-            return;
-        }
+        jobLock.lock();
+
         executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
         //load all executable, set them to a consistent status
         fetcherPool = Executors.newScheduledThreadPool(1);
@@ -228,6 +189,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
                 logger.debug("Closing zk connection");
                 try {
                     shutdown();
+                    jobLock.unlock();
                 } catch (SchedulerException e) {
                     logger.error("error shutdown scheduler", e);
                 }
@@ -243,7 +205,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
     public void shutdown() throws SchedulerException {
         fetcherPool.shutdown();
         jobPool.shutdown();
-        releaseLock();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index a5a1156..7fd4246 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.ZookeeperJobLock;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -96,7 +97,7 @@ public class BuildCubeWithEngineTest {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         jobService = ExecutableManager.getInstance(kylinConfig);
         scheduler = DefaultScheduler.getInstance();
-        scheduler.init(new JobEngineConfig(kylinConfig));
+        scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");
         }
@@ -212,7 +213,7 @@ public class BuildCubeWithEngineTest {
 
         // this cube's start date is 0, end date is 20501112000000
         long date1 = 0;
-        long date2 = f.parse("2013-01-01").getTime();
+        long date2 = f.parse("2022-01-01").getTime();
 
 
         // this cube doesn't support incremental build, always do full build

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
index 194a8d7..6dacc08 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.lock.ZookeeperJobLock;
 import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
 import org.junit.After;
 import org.junit.Before;
@@ -108,7 +109,7 @@ public class BuildIIWithEngineTest {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         jobService = ExecutableManager.getInstance(kylinConfig);
         scheduler = DefaultScheduler.getInstance();
-        scheduler.init(new JobEngineConfig(kylinConfig));
+        scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");
         }
@@ -134,7 +135,7 @@ public class BuildIIWithEngineTest {
             ii.setStatus(RealizationStatusEnum.READY);
             iiManager.updateII(ii);
         }
-        backup();
+       // backup();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 9da0f73..29ef7ad 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.job.impl.threadpool;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.ZookeeperJobLock;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -87,7 +88,7 @@ public abstract class BaseSchedulerTest extends HBaseMetadataTestCase {
         setFinalStatic(ExecutableConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10);
         jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
         scheduler = DefaultScheduler.getInstance();
-        scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+        scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new ZookeeperJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index 76c3911..ca932e9 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -67,6 +67,10 @@ public class TblColRef {
         this.column = column;
     }
 
+    public ColumnDesc getColumnDesc() {
+        return column;
+    }
+
     public ColumnDesc getColumn() {
         return column;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java b/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
index d1571ce..43f5e9a 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
@@ -51,6 +51,8 @@ public interface IRealization {
 
     public List<TblColRef> getAllColumns();
 
+    public List<TblColRef> getAllDimensions();
+
     public List<MeasureDesc> getMeasures();
 
     public boolean isReady();
@@ -63,4 +65,10 @@ public interface IRealization {
     
     public void setProjectName(String prjName);
 
+    public long getDateRangeStart();
+
+    public long getDateRangeEnd();
+
+    public String getModelName();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java b/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
index c05497e..2c7ba42 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
@@ -111,6 +111,7 @@ public class RealizationRegistry {
         IRealizationProvider p = providers.get(type);
         if (p == null) {
             logger.warn("No provider for realization type " + type);
+            return null;
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationType.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationType.java b/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationType.java
index 64e3ab7..1b5dbc5 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationType.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationType.java
@@ -24,5 +24,5 @@ package org.apache.kylin.metadata.realization;
 
 //TODO: change to String for plugin
 public enum RealizationType {
-    CUBE, INVERTED_INDEX
+    CUBE, INVERTED_INDEX, HYBRID
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java b/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
new file mode 100644
index 0000000..72e3b10
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
@@ -0,0 +1,114 @@
+package org.apache.kylin.metadata.realization;
+
+import com.google.common.base.Function;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.filter.*;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ */
+public class SQLDigestUtil {
+
+    public static <F, T> T appendTsFilterToExecute(SQLDigest sqlDigest, TblColRef partitionColRef, Range<Long> tsRange, Function<F, T> action) {
+
+        // add the boundary condition to query real-time
+        TupleFilter originalFilter = sqlDigest.filter;
+        sqlDigest.filter = createFilterForRealtime(originalFilter, partitionColRef, tsRange);
+
+        boolean addFilterColumn = false, addAllColumn = false;
+
+        if (!sqlDigest.filterColumns.contains(partitionColRef)) {
+            sqlDigest.filterColumns.add(partitionColRef);
+            addFilterColumn = true;
+        }
+
+        if (!sqlDigest.allColumns.contains(partitionColRef)) {
+            sqlDigest.allColumns.add(partitionColRef);
+            addAllColumn = true;
+        }
+
+        T ret = action.apply(null);
+
+        // restore the sqlDigest
+        sqlDigest.filter = originalFilter;
+
+        if (addFilterColumn)
+            sqlDigest.filterColumns.remove(partitionColRef);
+
+        if (addAllColumn)
+            sqlDigest.allColumns.remove(partitionColRef);
+
+        return ret;
+    }
+
+    //ts column type differentiate
+    private static String formatTimeStr(DataType type, long ts) {
+        String ret;
+        if (type == DataType.getInstance("date")) {
+            ret = DateFormat.formatToDateStr(ts);
+        } else if (type == DataType.getInstance("long")) {
+            ret = String.valueOf(ts);
+        } else {
+            throw new IllegalArgumentException("Illegal type for partition column " + type);
+        }
+        return ret;
+    }
+
+    private static TupleFilter createFilterForRealtime(TupleFilter originFilter, TblColRef partitionColRef, Range<Long> tsRange) {
+        DataType type = partitionColRef.getColumnDesc().getType();
+
+        String startTimeStr, endTimeStr;
+        CompareTupleFilter startFilter = null, endFilter = null;
+        if (tsRange.hasLowerBound()) {
+            startTimeStr = formatTimeStr(type, tsRange.lowerEndpoint());
+            if (tsRange.lowerBoundType() == BoundType.CLOSED) {
+                startFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GTE);
+            } else {
+                startFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+            }
+            ColumnTupleFilter columnTupleFilter = new ColumnTupleFilter(partitionColRef);
+            ConstantTupleFilter constantTupleFilter = new ConstantTupleFilter(startTimeStr);
+            startFilter.addChild(columnTupleFilter);
+            startFilter.addChild(constantTupleFilter);
+        }
+
+        if (tsRange.hasUpperBound()) {
+            endTimeStr = formatTimeStr(type, tsRange.upperEndpoint());
+            if (tsRange.upperBoundType() == BoundType.CLOSED) {
+                endFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+            } else {
+                endFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LT);
+            }
+            ColumnTupleFilter columnTupleFilter = new ColumnTupleFilter(partitionColRef);
+            ConstantTupleFilter constantTupleFilter = new ConstantTupleFilter(endTimeStr);
+            endFilter.addChild(columnTupleFilter);
+            endFilter.addChild(constantTupleFilter);
+        }
+
+        if (originFilter == null) {
+            if (endFilter == null) {
+                return startFilter;
+            }
+            if (startFilter == null) {
+                return endFilter;
+            }
+        }
+
+        LogicalTupleFilter logicalTupleFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+
+        if (originFilter != null) {
+            logicalTupleFilter.addChild(originFilter);
+        }
+        if (startFilter != null) {
+            logicalTupleFilter.addChild(startFilter);
+        }
+        if (endFilter != null) {
+            logicalTupleFilter.addChild(endFilter);
+        }
+
+        return logicalTupleFilter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java
new file mode 100644
index 0000000..f5d8dd6
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java
@@ -0,0 +1,45 @@
+package org.apache.kylin.metadata.tuple;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ */
+public class CompoundTupleIterator implements ITupleIterator {
+    private static final Logger logger = LoggerFactory.getLogger(CompoundTupleIterator.class);
+    private List<ITupleIterator> backends;
+    private Iterator<ITuple> compoundIterator;
+
+    public CompoundTupleIterator(List<ITupleIterator> backends) {
+        Preconditions.checkArgument(backends != null && backends.size() != 0, "backends not exists");
+        this.backends = backends;
+        this.compoundIterator = Iterators.concat(backends.iterator());
+    }
+
+    @Override
+    public void close() {
+        for (ITupleIterator i : backends) {
+            i.close();
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        return this.compoundIterator.hasNext();
+    }
+
+    @Override
+    public ITuple next() {
+        return this.compoundIterator.next();
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
index 6966751..5235387 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
@@ -18,10 +18,12 @@
 
 package org.apache.kylin.metadata.tuple;
 
+import java.util.Iterator;
+
 /**
  * @author xjiang
  */
-public interface ITupleIterator {
+public interface ITupleIterator extends Iterator<ITuple>  {
     public static final ITupleIterator EMPTY_TUPLE_ITERATOR = new ITupleIterator() {
         @Override
         public boolean hasNext() {
@@ -34,6 +36,10 @@ public interface ITupleIterator {
         }
 
         @Override
+        public void remove() {
+        }
+
+        @Override
         public void close() {
         }
     };

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
index aaad516..04972fa 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
@@ -41,10 +41,8 @@ public abstract class RoutingRule {
     //1. simple query use II prior to cube
     //2. exact match prior to week match
     static {
-        rules.add(new RealizationPriorityRule());
         rules.add(new RemoveUncapableRealizationsRule());
-        rules.add(new SimpleQueryMoreColumnsCubeFirstRule());
-        rules.add(new CubesSortRule());
+        rules.add(new RealizationSortRule());
         rules.add(new AdjustForWeeklyMatchedRealization());//this rule might modify olapcontext content, better put it at last
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
index 465d162..3402556 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,12 +18,6 @@
 
 package org.apache.kylin.query.routing.RoutingRules;
 
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.kylin.cube.CubeCapabilityChecker;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -34,9 +28,15 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.routing.RoutingRule;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 
 /**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
  */
 public class AdjustForWeeklyMatchedRealization extends RoutingRule {
     private static final Logger logger = LoggerFactory.getLogger(AdjustForWeeklyMatchedRealization.class);
@@ -46,6 +46,13 @@ public class AdjustForWeeklyMatchedRealization extends RoutingRule {
         if (realizations.size() > 0) {
             IRealization first = realizations.get(0);
 
+            if (first instanceof HybridInstance) {
+                HybridInstance hybrid = (HybridInstance) first;
+
+                if (hybrid.getHistoryRealizationInstance() instanceof CubeInstance)
+                    first = hybrid.getHistoryRealizationInstance();
+            }
+
             if (first instanceof CubeInstance) {
                 CubeInstance cube = (CubeInstance) first;
                 adjustOLAPContextIfNecessary(cube, olapContext);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
index 6a67140..7425d62 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
@@ -33,8 +33,9 @@ public class RealizationPriorityRule extends RoutingRule {
     static Map<RealizationType, Integer> priorities = Maps.newHashMap();
 
     static {
-        priorities.put(RealizationType.CUBE, 0);
-        priorities.put(RealizationType.INVERTED_INDEX, 1);
+        priorities.put(RealizationType.HYBRID, 2);
+        priorities.put(RealizationType.CUBE, 1);
+        priorities.put(RealizationType.INVERTED_INDEX, 0);
     }
 
     public static void setPriorities(Map<RealizationType, Integer> priorities) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
new file mode 100644
index 0000000..f9a250e
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing.RoutingRules;
+
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.routing.RoutingRule;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ */
+public class RealizationSortRule extends RoutingRule {
+    @Override
+    public void apply(List<IRealization> realizations, final OLAPContext olapContext) {
+
+        // sort cube candidates, 0) the priority 1) the cost indicator, 2) the lesser header
+        // columns the better, 3) the lesser body columns the better 4) the larger date range the better
+
+        Collections.sort(realizations, new Comparator<IRealization>() {
+            @Override
+            public int compare(IRealization o1, IRealization o2) {
+                int i1 = RealizationPriorityRule.priorities.get(o1.getType());
+                int i2 = RealizationPriorityRule.priorities.get(o2.getType());
+                int comp = i1 - i2;
+                if (comp != 0) {
+                    return comp;
+                }
+
+                comp = o1.getCost(olapContext.getSQLDigest()) - o2.getCost(olapContext.getSQLDigest());
+                if (comp != 0) {
+                    return comp;
+                }
+
+                if (o1 instanceof HybridInstance)
+                    return -1;
+                else if (o2 instanceof HybridInstance)
+                    return 1;
+
+                return 0;
+            }
+        });
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
index f5b42d5..b58e439 100644
--- a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
@@ -24,6 +24,7 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.routing.RoutingRules.RealizationPriorityRule;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
@@ -40,6 +41,7 @@ public class IIQueryTest extends KylinQueryTest {
         Map<RealizationType, Integer> priorities = Maps.newHashMap();
         priorities.put(RealizationType.INVERTED_INDEX, 0);
         priorities.put(RealizationType.CUBE, 1);
+        priorities.put(RealizationType.HYBRID, 1);
         RealizationPriorityRule.setPriorities(priorities);
 
     }
@@ -51,6 +53,7 @@ public class IIQueryTest extends KylinQueryTest {
         Map<RealizationType, Integer> priorities = Maps.newHashMap();
         priorities.put(RealizationType.INVERTED_INDEX, 1);
         priorities.put(RealizationType.CUBE, 0);
+        priorities.put(RealizationType.HYBRID, 0);
         RealizationPriorityRule.setPriorities(priorities);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 2f2d9cf..0e0a0b1 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 
+import org.apache.kylin.common.lock.ZookeeperJobLock;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,7 +83,7 @@ public class JobController extends BasicController implements InitializingBean {
                 public void run() {
                     try {
                         DefaultScheduler scheduler = DefaultScheduler.getInstance();
-                        scheduler.init(new JobEngineConfig(kylinConfig));
+                        scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
                         if (!scheduler.hasStarted()) {
                             logger.error("scheduler has not been started");
                             System.exit(1);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
index 5eb4ea3..b81bad2 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
@@ -24,6 +24,8 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.storage.hbase.CubeStorageEngine;
 import org.apache.kylin.storage.hbase.InvertedIndexStorageEngine;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridStorageEngine;
 
 /**
  * @author xjiang
@@ -33,8 +35,10 @@ public class StorageEngineFactory {
     public static IStorageEngine getStorageEngine(IRealization realization) {
         if (realization.getType() == RealizationType.INVERTED_INDEX) {
             return new InvertedIndexStorageEngine((IIInstance) realization);
-        } else {
+        } else if (realization.getType() == RealizationType.CUBE) {
             return new CubeStorageEngine((CubeInstance) realization);
+        } else {
+            return new HybridStorageEngine((HybridInstance) realization);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
index 4589e4d..1a8decd 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
@@ -180,6 +180,11 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         return this.tuple;
     }
 
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
     private void scanNextRange() {
         if (this.rangeIterator.hasNext()) {
             closeScanner();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index 8a22579..97ba981 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -122,6 +122,11 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
     }
 
     @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public void close() {
         context.setTotalScanCount(scanCount);
         segmentIterator.close();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index 8b4d8cf..b2f07c0 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -194,6 +194,11 @@ public class EndpointTupleIterator implements ITupleIterator {
     }
 
     @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public void close() {
         IOUtils.closeQuietly(table);
         logger.info("Closed after " + rowsInAllMetric + " rows are fetched");
@@ -312,6 +317,11 @@ public class EndpointTupleIterator implements ITupleIterator {
         }
 
         @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
         public void close() {
 
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
new file mode 100644
index 0000000..1694719
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -0,0 +1,188 @@
+package org.apache.kylin.storage.hybrid;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationRegistry;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.metadata.realization.SQLDigest;
+
+import java.util.List;
+
+/**
+ */
+
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class HybridInstance extends RootPersistentEntity implements IRealization {
+
+    @JsonIgnore
+    private KylinConfig config;
+
+    @JsonProperty("name")
+    private String name;
+
+    @JsonProperty("historyRealization")
+    private RealizationEntry historyRealization;
+
+    @JsonProperty("realTimeRealization")
+    private RealizationEntry realTimeRealization;
+
+    private IRealization historyRealizationInstance;
+    private IRealization realTimeRealizationInstance;
+    private String projectName;
+
+    public void init() {
+        RealizationRegistry registry = RealizationRegistry.getInstance(config);
+        historyRealizationInstance = registry.getRealization(historyRealization.getType(), historyRealization.getRealization());
+        realTimeRealizationInstance = registry.getRealization(realTimeRealization.getType(), realTimeRealization.getRealization());
+
+        if (historyRealizationInstance == null) {
+            throw new IllegalArgumentException("Didn't find realization '" + historyRealization.getType() + "'" + " with name '" + historyRealization.getRealization() + "' in '" + name + "'");
+        }
+
+        if (realTimeRealizationInstance == null) {
+            throw new IllegalArgumentException("Didn't find realization '" + realTimeRealization.getType() + "'" + " with name '" + realTimeRealization.getRealization() + "' in '" + name + "'");
+        }
+
+    }
+
+    @Override
+    public boolean isCapable(SQLDigest digest) {
+        return getHistoryRealizationInstance().isCapable(digest) && getRealTimeRealizationInstance().isCapable(digest);
+    }
+
+    @Override
+    public int getCost(SQLDigest digest) {
+        return historyRealizationInstance.getCost(digest);
+        //return Math.min(historyRealizationInstance.getCost(digest), realTimeRealizationInstance.getCost(digest));
+    }
+
+    @Override
+    public RealizationType getType() {
+        return RealizationType.HYBRID;
+    }
+
+    @Override
+    public String getFactTable() {
+        return getHistoryRealizationInstance().getFactTable();
+    }
+
+    @Override
+    public List<TblColRef> getAllColumns() {
+        return getHistoryRealizationInstance().getAllColumns();
+    }
+
+    @Override
+    public List<MeasureDesc> getMeasures() {
+        return getHistoryRealizationInstance().getMeasures();
+    }
+
+    @Override
+    public boolean isReady() {
+        return historyRealizationInstance.isReady() || realTimeRealizationInstance.isReady();
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String toString() {
+        return getCanonicalName();
+    }
+
+    @Override
+    public String getCanonicalName() {
+        return getType() + "[name=" + name + "]";
+    }
+
+    @Override
+    public String getProjectName() {
+        return projectName;
+    }
+
+    @Override
+    public void setProjectName(String prjName) {
+        projectName = prjName;
+    }
+
+    public KylinConfig getConfig() {
+        return config;
+    }
+
+    public void setConfig(KylinConfig config) {
+        this.config = config;
+    }
+
+    public RealizationEntry getHistoryRealization() {
+        return historyRealization;
+    }
+
+    public RealizationEntry getRealTimeRealization() {
+        return realTimeRealization;
+    }
+
+    public IRealization getHistoryRealizationInstance() {
+        if (historyRealizationInstance == null) {
+            this.init();
+        }
+        return historyRealizationInstance;
+    }
+
+    public IRealization getRealTimeRealizationInstance() {
+        if (realTimeRealizationInstance == null) {
+            this.init();
+        }
+        return realTimeRealizationInstance;
+    }
+
+    @Override
+    public long getDateRangeStart() {
+        return Math.min(getHistoryRealizationInstance().getDateRangeStart(), getRealTimeRealizationInstance().getDateRangeStart());
+    }
+
+    @Override
+    public long getDateRangeEnd() {
+        return Math.max(getHistoryRealizationInstance().getDateRangeEnd(), getRealTimeRealizationInstance().getDateRangeEnd()) +1;
+    }
+
+    
+
+    public DataModelDesc getDataModelDesc(){
+        if (getHistoryRealizationInstance() instanceof CubeInstance) {
+            return ((CubeInstance) historyRealizationInstance).getDescriptor().getModel();
+        }
+
+        return ((IIInstance) historyRealizationInstance).getDescriptor().getModel();
+    }
+
+   @Override
+    public String getModelName() {
+        if (getHistoryRealizationInstance() instanceof CubeInstance) {
+            return ((CubeInstance) historyRealizationInstance).getDescriptor().getModelName();
+        }
+
+        return ((IIInstance) historyRealizationInstance).getDescriptor().getModelName();
+    }
+
+    @Override
+    public List<TblColRef> getAllDimensions(){
+
+        return this.getHistoryRealizationInstance().getAllDimensions();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
new file mode 100644
index 0000000..df3fae3
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -0,0 +1,127 @@
+package org.apache.kylin.storage.hybrid;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationProvider;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ */
+public class HybridManager implements IRealizationProvider {
+    public static final Serializer<HybridInstance> HYBRID_SERIALIZER = new JsonSerializer<HybridInstance>(HybridInstance.class);
+
+    private static final Logger logger = LoggerFactory.getLogger(HybridManager.class);
+
+    // static cached instances
+    private static final ConcurrentHashMap<KylinConfig, HybridManager> CACHE = new ConcurrentHashMap<KylinConfig, HybridManager>();
+
+    public static HybridManager getInstance(KylinConfig config) {
+        HybridManager r = CACHE.get(config);
+        if (r != null) {
+            return r;
+        }
+
+        synchronized (HybridManager.class) {
+            r = CACHE.get(config);
+            if (r != null) {
+                return r;
+            }
+            try {
+                r = new HybridManager(config);
+                CACHE.put(config, r);
+                if (CACHE.size() > 1) {
+                    logger.warn("More than one Hybrid Manager singleton exist");
+                }
+                return r;
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to init Hybrid Manager from " + config, e);
+            }
+        }
+    }
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    // ============================================================================
+
+    private KylinConfig config;
+
+    private CaseInsensitiveStringCache<HybridInstance> hybridMap = new CaseInsensitiveStringCache<HybridInstance>(Broadcaster.TYPE.HYBRID);
+
+    private HybridManager(KylinConfig config) throws IOException {
+        logger.info("Initializing HybridManager with config " + config);
+        this.config = config;
+
+        loadAllHybridInstance();
+    }
+
+    private void loadAllHybridInstance() throws IOException {
+        ResourceStore store = getStore();
+        List<String> paths = store.collectResourceRecursively(ResourceStore.HYBRID_RESOURCE_ROOT, ".json");
+
+        logger.debug("Loading Hybrid from folder " + store.getReadableResourcePath(ResourceStore.HYBRID_RESOURCE_ROOT));
+
+        for (String path : paths) {
+            loadHybridInstance(path);
+        }
+
+        logger.debug("Loaded " + paths.size() + " Hybrid(s)");
+    }
+
+    private synchronized HybridInstance loadHybridInstance(String path) throws IOException {
+        ResourceStore store = getStore();
+
+        HybridInstance hybridInstance = null;
+        try {
+            hybridInstance = store.getResource(path, HybridInstance.class, HYBRID_SERIALIZER);
+            hybridInstance.setConfig(config);
+
+            if (StringUtils.isBlank(hybridInstance.getName()))
+                throw new IllegalStateException("HybridInstance name must not be blank, at " + path);
+
+            if (hybridInstance.getHistoryRealization() == null || hybridInstance.getRealTimeRealization() == null) {
+
+                throw new IllegalStateException("HybridInstance must have both historyRealization and realTimeRealization set, at " + path);
+            }
+
+            final String name = hybridInstance.getName();
+            hybridMap.putLocal(name, hybridInstance);
+
+            return hybridInstance;
+        } catch (Exception e) {
+            logger.error("Error during load hybrid instance " + path, e);
+            return null;
+        }
+    }
+
+    @Override
+    public RealizationType getRealizationType() {
+        return RealizationType.HYBRID;
+    }
+
+    @Override
+    public IRealization getRealization(String name) {
+        return getHybridInstance(name);
+    }
+
+    public HybridInstance getHybridInstance(String name) {
+        return hybridMap.get(name);
+    }
+
+    private ResourceStore getStore() {
+        return ResourceStore.getStore(this.config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
new file mode 100644
index 0000000..7ad5ed0
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
@@ -0,0 +1,69 @@
+package org.apache.kylin.storage.hybrid;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ranges;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.SQLDigestUtil;
+import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.StorageEngineFactory;
+import org.apache.kylin.storage.tuple.TupleInfo;
+
+import javax.annotation.Nullable;
+
+/**
+ */
+public class HybridStorageEngine implements IStorageEngine {
+
+    private HybridInstance hybridInstance;
+    private IStorageEngine historicalStorageEngine;
+    private IStorageEngine realtimeStorageEngine;
+
+    public HybridStorageEngine(HybridInstance hybridInstance) {
+        this.hybridInstance = hybridInstance;
+        this.historicalStorageEngine = StorageEngineFactory.getStorageEngine(this.hybridInstance.getHistoryRealizationInstance());
+        this.realtimeStorageEngine = StorageEngineFactory.getStorageEngine(this.hybridInstance.getRealTimeRealizationInstance());
+    }
+
+    @Override
+    public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest) {
+
+        // search the historic realization
+        ITupleIterator historicalDataIterator = this.historicalStorageEngine.search(context, sqlDigest);
+
+        String modelName = hybridInstance.getModelName();
+        MetadataManager metaMgr = getMetadataManager();
+        DataModelDesc modelDesc = metaMgr.getDataModelDesc(modelName);
+
+        // if the model isn't partitioned, only query the history
+        if (modelDesc.getPartitionDesc() == null || modelDesc.getPartitionDesc().getPartitionDateColumnRef() == null)
+            return historicalDataIterator;
+
+        TblColRef partitionColRef = modelDesc.getPartitionDesc().getPartitionDateColumnRef();
+
+        ITupleIterator realtimeDataIterator = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, //
+                Ranges.atLeast(hybridInstance.getHistoryRealizationInstance().getDateRangeEnd()),//
+                new Function<Void, ITupleIterator>() {
+                    @Nullable
+                    @Override
+                    public ITupleIterator apply(@Nullable Void input) {
+                        return realtimeStorageEngine.search(context, sqlDigest);
+                    }
+                });
+
+        // combine history and real-time tuple iterator
+        return new CompoundTupleIterator(Lists.newArrayList(historicalDataIterator, realtimeDataIterator));
+    }
+
+
+    private MetadataManager getMetadataManager() {
+        return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
index 87c6445..b98b09b 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
@@ -36,6 +36,7 @@ import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TableRecordInfoTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TableRecordInfoTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TableRecordInfoTest.java
index c4f785a..6ec9385 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TableRecordInfoTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TableRecordInfoTest.java
@@ -25,6 +25,7 @@ import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/test/java/org/apache/kylin/storage/hybrid/HybridManagerTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hybrid/HybridManagerTest.java b/storage/src/test/java/org/apache/kylin/storage/hybrid/HybridManagerTest.java
new file mode 100644
index 0000000..3ad7ca6
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/hybrid/HybridManagerTest.java
@@ -0,0 +1,45 @@
+package org.apache.kylin.storage.hybrid;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ */
+public class HybridManagerTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testBasics() throws Exception {
+        HybridInstance cube = getHybridManager().getHybridInstance("test_kylin_hybrid_left_join");
+        cube.init();
+        System.out.println(JsonUtil.writeValueAsIndentString(cube));
+
+        IRealization history = cube.getHistoryRealizationInstance();
+        IRealization realTime = cube.getRealTimeRealizationInstance();
+
+        Assert.assertTrue(history instanceof CubeInstance);
+        Assert.assertTrue(realTime instanceof IIInstance);
+
+    }
+
+
+    public HybridManager getHybridManager() {
+        return HybridManager.getInstance(getTestConfig());
+    }
+}