You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/15 07:21:38 UTC

[09/52] [abbrv] incubator-kylin git commit: KYLIN-759 refactor storage layer cache

KYLIN-759 refactor storage layer cache


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/bc426cc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/bc426cc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/bc426cc0

Branch: refs/heads/0.8.0
Commit: bc426cc008e4424c81c62e7541cd244ac9ad811b
Parents: 6924057
Author: honma <ho...@ebay.com>
Authored: Wed May 13 16:59:45 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu May 14 20:22:04 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/RangeUtil.java |  14 +-
 .../org/apache/kylin/common/util/BasicTest.java |   3 +
 .../metadata/tuple/CompoundTupleIterator.java   |  25 +--
 .../kylin/metadata/tuple/ITupleIterator.java    |  14 --
 .../metadata/tuple/SimpleTupleIterator.java     |   6 -
 .../kylin/metadata/tuple/TeeTupleIterator.java  |  26 +--
 .../metadata/tuple/TeeTupleItrListener.java     |  10 +
 .../kylin/query/enumerator/OLAPEnumerator.java  |   2 +-
 .../apache/kylin/storage/IStorageEngine.java    |  23 +++
 .../kylin/storage/StorageEngineFactory.java     |  51 +++--
 .../AbstractCacheFledgedStorageEngine.java      |  49 +++++
 .../cache/CacheFledgedDynamicStorageEngine.java | 148 +++++++++++++++
 .../cache/CacheFledgedStaticStorageEngine.java  |  80 ++++++++
 .../cache/CacheFledgedStorageEngine.java        | 186 -------------------
 .../kylin/storage/cache/StorageLayerCache.java  |   7 -
 .../kylin/storage/cache/StreamSQLResult.java    |  44 +++--
 .../kylin/storage/cube/CubeStorageEngine.java   |  22 ++-
 .../cube/SerializedCubeTupleIterator.java       |   9 +-
 .../storage/hbase/CubeSegmentTupleIterator.java |  19 +-
 .../kylin/storage/hbase/CubeStorageEngine.java  |  47 +++--
 .../hbase/InvertedIndexStorageEngine.java       |  19 +-
 .../hbase/SerializedHBaseTupleIterator.java     |   4 -
 .../endpoint/EndpointTupleIterator.java         |  21 ++-
 .../endpoint/TsConditionExtractor.java          |   9 +-
 .../storage/hybrid/HybridStorageEngine.java     |  38 ++--
 .../apache/kylin/storage/test/StorageTest.java  |  15 +-
 26 files changed, 512 insertions(+), 379 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java b/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
index 33d6770..05e4d35 100644
--- a/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
@@ -1,11 +1,11 @@
 package org.apache.kylin.common.util;
 
+import com.google.common.collect.*;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.NavigableMap;
 
-import com.google.common.collect.*;
-
 /**
  */
 public class RangeUtil {
@@ -44,6 +44,16 @@ public class RangeUtil {
         return range.upperBoundType() == BoundType.CLOSED;
     }
 
+    public static <C extends Comparable<?>> Range<C> merge(Range<C> a, Range<C> b) {
+        if (a == null && b == null) {
+            return null;
+        } else if (a == null || b == null) {
+            return a == null ? b : a;
+        } else {
+            return a.span(b);
+        }
+    }
+
     /**
      * remove from self the elements that exist in other
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 2f4943d..e0226f8 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -62,6 +62,9 @@ public class BasicTest {
     @Test
     @Ignore("convenient trial tool for dev")
     public void test1() throws Exception {
+        Number xx =new Long(0L);
+        System.out.println(xx.getClass().getName());
+
         System.out.println(time(1367798400000L));
 
         System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1000L * Integer.MAX_VALUE));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java
index b87c633..f5d8dd6 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java
@@ -1,14 +1,13 @@
 package org.apache.kylin.metadata.tuple;
 
-import java.util.Iterator;
-import java.util.List;
-
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
-import com.google.common.collect.Range;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
+import java.util.List;
+
 /**
  */
 public class CompoundTupleIterator implements ITupleIterator {
@@ -30,24 +29,6 @@ public class CompoundTupleIterator implements ITupleIterator {
     }
 
     @Override
-    public Range<Long> getCacheExcludedPeriod() {
-        Range<Long> timeSpan = null;
-        for (ITupleIterator itt : this.backends) {
-            Range<Long> excluded = itt.getCacheExcludedPeriod();
-            if (excluded != null && !excluded.isEmpty()) {
-                if (timeSpan == null) {
-                    //first one
-                    timeSpan = excluded;
-                } else {
-                    logger.warn("There are two excluded period, use a span to replace them: " + timeSpan + " and " + excluded );
-                    timeSpan = timeSpan.span(excluded);
-                }
-            }
-        }
-        return timeSpan;
-    }
-
-    @Override
     public boolean hasNext() {
         return this.compoundIterator.hasNext();
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
index 9e92eb9..6bab137 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
@@ -20,8 +20,6 @@ package org.apache.kylin.metadata.tuple;
 
 import java.util.Iterator;
 
-import com.google.common.collect.Range;
-
 /**
  * @author xjiang
  *
@@ -46,22 +44,10 @@ public interface ITupleIterator extends Iterator<ITuple> {
         @Override
         public void close() {
         }
-
-        @Override
-        public Range<Long> getCacheExcludedPeriod() {
-            return null;
-        }
     };
 
     void close();
 
-    /**
-     * tells storage layer cache what time period of data should not be cached.
-     * for static storage like cube, it will return null
-     * for dynamic storage like ii, it will for example exclude the last two minutes for possible data latency
-     * @return
-     */
-    Range<Long> getCacheExcludedPeriod();
 
     /**
      * if hasNext() returns false because there's no more data, return true

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/metadata/src/main/java/org/apache/kylin/metadata/tuple/SimpleTupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/SimpleTupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/SimpleTupleIterator.java
index ebdcb5d..92006e1 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/SimpleTupleIterator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/SimpleTupleIterator.java
@@ -2,8 +2,6 @@ package org.apache.kylin.metadata.tuple;
 
 import java.util.Iterator;
 
-import com.google.common.collect.Range;
-
 /**
  *
  */
@@ -34,8 +32,4 @@ public class SimpleTupleIterator implements ITupleIterator {
     public void close() {
     }
 
-    @Override
-    public Range<Long> getCacheExcludedPeriod() {
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
index 556fca6..0618b45 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
@@ -1,10 +1,8 @@
 package org.apache.kylin.metadata.tuple;
 
-import java.util.List;
-
-import com.google.common.base.Function;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
+
+import java.util.List;
 
 /**
  *
@@ -13,29 +11,21 @@ import com.google.common.collect.Range;
  */
 public class TeeTupleIterator implements ITupleIterator {
 
-    private Function<List<ITuple>, Void> actionOnSeeingWholeData;
     private ITupleIterator underlying;
     private List<ITuple> duplicatedData;
+    private List<TeeTupleItrListener> listeners = Lists.newArrayList();
 
     public TeeTupleIterator(ITupleIterator underlying) {
         this.underlying = underlying;
         this.duplicatedData = Lists.newArrayList();
     }
 
-    public void setActionOnSeeingWholeData(Function<List<ITuple>, Void> actionOnSeeingWholeData) {
-        this.actionOnSeeingWholeData = actionOnSeeingWholeData;
-    }
-
     @Override
     public void close() {
         this.underlying.close();
-        //if(this.underlying.isDrained)
-        actionOnSeeingWholeData.apply(duplicatedData);
-    }
-
-    @Override
-    public Range<Long> getCacheExcludedPeriod() {
-        return this.underlying.getCacheExcludedPeriod();
+        for (TeeTupleItrListener listener : this.listeners) {
+            listener.notify(this.duplicatedData);
+        }
     }
 
     @Override
@@ -54,4 +44,8 @@ public class TeeTupleIterator implements ITupleIterator {
     public void remove() {
         this.underlying.remove();
     }
+
+    public void addCloseListener(TeeTupleItrListener listener) {
+        this.listeners.add(listener);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleItrListener.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleItrListener.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleItrListener.java
new file mode 100644
index 0000000..50b94bd
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleItrListener.java
@@ -0,0 +1,10 @@
+package org.apache.kylin.metadata.tuple;
+
+import java.util.List;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/13/15.
+ */
+public interface TeeTupleItrListener {
+    void notify(List<ITuple> duplicated);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
index a5019ab..63f06f1 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
@@ -112,7 +112,7 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
         olapContext.resetSQLDigest();
 
         // query storage engine
-        IStorageEngine storageEngine = StorageEngineFactory.getStorageEngine(olapContext.realization, true);
+        IStorageEngine storageEngine = StorageEngineFactory.getStorageEngine(olapContext.realization);
         ITupleIterator iterator = storageEngine.search(olapContext.storageContext, olapContext.getSQLDigest(), olapContext.returnTupleInfo);
         if (logger.isDebugEnabled()) {
             logger.debug("return TupleIterator...");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java
index 9a88676..d6b777a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/IStorageEngine.java
@@ -22,6 +22,8 @@ import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.storage.tuple.TupleInfo;
 
+import com.google.common.collect.Range;
+
 /**
  * 
  * @author xjiang
@@ -31,4 +33,25 @@ public interface IStorageEngine {
 
     ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo);
 
+    /**
+     *
+     * being dynamic => getVolatilePeriod() return not null
+     * being dynamic => partition column of its realization not null
+     *
+     * @return true for static storage like cubes
+     *          false for dynamic storage like II
+     */
+    boolean isDynamic();
+
+    /**
+     * volatile period is the period of time in which the returned data is not stable
+     * e.g. inverted index's last several minutes' data is dynamic as time goes by.
+     * data in this period cannot be cached
+     *
+     * This method should not be called before ITupleIterator.close() is called
+     *
+     * @return null if the underlying storage guarantees the data is static
+     */
+    Range<Long> getVolatilePeriod();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
index 4d150ad..29a53fd 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
@@ -18,37 +18,66 @@
 
 package org.apache.kylin.storage;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.storage.cache.CacheFledgedStorageEngine;
+import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine;
+import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine;
 import org.apache.kylin.storage.hbase.CubeStorageEngine;
 import org.apache.kylin.storage.hbase.InvertedIndexStorageEngine;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.apache.kylin.storage.hybrid.HybridStorageEngine;
 
+import com.google.common.base.Preconditions;
+
 /**
  * @author xjiang
  */
 public class StorageEngineFactory {
+    private static boolean allowStorageLayerCache = true;
 
-    public static IStorageEngine getStorageEngine(IRealization realization, boolean allowCache) {
-        if (realization.getType() == RealizationType.CUBE) {
-            allowCache = false;
-        }
-        
-        if (allowCache) {
-            return new CacheFledgedStorageEngine(realization);
-        }
+    public static IStorageEngine getStorageEngine(IRealization realization) {
 
         if (realization.getType() == RealizationType.INVERTED_INDEX) {
-            return new InvertedIndexStorageEngine((IIInstance) realization);
+            IStorageEngine ret = new InvertedIndexStorageEngine((IIInstance) realization);
+            if (allowStorageLayerCache) {
+                return wrapWithCache(ret, realization);
+            } else {
+                return ret;
+            }
         } else if (realization.getType() == RealizationType.CUBE) {
-            return new CubeStorageEngine((CubeInstance) realization);
+            IStorageEngine ret = new CubeStorageEngine((CubeInstance) realization);
+            if (allowStorageLayerCache) {
+                return wrapWithCache(ret, realization);
+            } else {
+                return ret;
+            }
         } else {
             return new HybridStorageEngine((HybridInstance) realization);
         }
     }
 
+    private static IStorageEngine wrapWithCache(IStorageEngine underlyingStorageEngine, IRealization realization) {
+        if (underlyingStorageEngine.isDynamic()) {
+            return new CacheFledgedDynamicStorageEngine(underlyingStorageEngine, getPartitionCol(realization));
+        } else {
+            return new CacheFledgedStaticStorageEngine(underlyingStorageEngine);
+        }
+    }
+
+    private static TblColRef getPartitionCol(IRealization realization) {
+        String modelName = realization.getModelName();
+        DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
+        PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
+        Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
+        TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef();
+        Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
+        return partitionColRef;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
new file mode 100644
index 0000000..d8e2fc0
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
@@ -0,0 +1,49 @@
+package org.apache.kylin.storage.cache;
+
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.config.MemoryUnit;
+import net.sf.ehcache.config.PersistenceConfiguration;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+
+import org.apache.kylin.metadata.realization.StreamSQLDigest;
+import org.apache.kylin.storage.IStorageEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/13/15.
+ */
+public abstract class AbstractCacheFledgedStorageEngine {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractCacheFledgedStorageEngine.class);
+    protected static CacheManager cacheManager = CacheManager.create();
+
+    protected final IStorageEngine underlyingStorage;
+    protected StreamSQLDigest streamSQLDigest;
+    protected boolean queryCacheExists;
+
+    public AbstractCacheFledgedStorageEngine(IStorageEngine underlyingStorage) {
+        this.underlyingStorage = underlyingStorage;
+        this.queryCacheExists = false;
+        this.makeCacheIfNecessary(underlyingStorage.getClass().getName());
+    }
+
+    private void makeCacheIfNecessary(String storageClassName) {
+        if (cacheManager.getCache(storageClassName) == null) {
+            logger.info("Cache for {} initting:", storageClassName);
+            // TODO: L4J [2015-04-20 10:44:03,817][WARN][net.sf.ehcache.pool.sizeof.ObjectGraphWalker] - The configured limit of 1,000 object references was reached while attempting to calculate the size of the object graph. Severe performance degradation could occur if the sizing operation continues. This can be avoided by setting the CacheManger or Cache <sizeOfPolicy> elements maxDepthExceededBehavior to "abort" or adding stop points with @IgnoreSizeOf annotations. If performance degradation is NOT an issue at the configured limit, raise the limit value using the CacheManager or Cache <sizeOfPolicy
+            //Create a Cache specifying its configuration.
+            Cache storageCache = new Cache(new CacheConfiguration(storageClassName, 0).//
+                    memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
+                    eternal(false).//
+                    timeToIdleSeconds(86400).//
+                    diskExpiryThreadIntervalSeconds(0).//
+                    maxBytesLocalHeap(256, MemoryUnit.MEGABYTES).//
+                    persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
+            //TODO: deal with failed queries, and only cache too long query
+
+            cacheManager.addCache(storageCache);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
new file mode 100644
index 0000000..0d7ab18
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
@@ -0,0 +1,148 @@
+package org.apache.kylin.storage.cache;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.Element;
+import org.apache.kylin.common.util.RangeUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.SQLDigestUtil;
+import org.apache.kylin.metadata.realization.StreamSQLDigest;
+import org.apache.kylin.metadata.tuple.*;
+import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.coprocessor.endpoint.TsConditionExtractor;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/11/15.
+ */
+public class CacheFledgedDynamicStorageEngine extends AbstractCacheFledgedStorageEngine implements IStorageEngine, TeeTupleItrListener {
+    private static final Logger logger = LoggerFactory.getLogger(CacheFledgedDynamicStorageEngine.class);
+
+    private final TblColRef partitionColRef;
+
+    private Range<Long> ts;
+
+    public CacheFledgedDynamicStorageEngine(IStorageEngine underlyingStorage, TblColRef partitionColRef) {
+        super(underlyingStorage);
+        this.partitionColRef = partitionColRef;
+
+        Preconditions.checkArgument(this.partitionColRef != null, "For dynamic columns like " + //
+                this.underlyingStorage.getClass().getName() + ", partition column must be provided");
+    }
+
+    @Override
+    public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
+        //if isDynamicStorage && query involves filter on partition column, the cache requires updating after query done.
+        boolean needUpdateCache = sqlDigest.groupbyColumns.contains(partitionColRef);
+
+        streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
+        StreamSQLResult cachedResult = null;
+        Cache cache = cacheManager.getCache(this.underlyingStorage.getClass().getName());
+        Element element = cache.get(streamSQLDigest);
+        if (element != null) {
+            this.queryCacheExists = true;
+            cachedResult = (StreamSQLResult) element.getObjectValue();
+        }
+
+        ts = TsConditionExtractor.extractTsCondition(partitionColRef, sqlDigest.filter);
+        if (ts == null || ts.isEmpty()) {
+            logger.info("ts range in the query conflicts,return empty directly");
+            return ITupleIterator.EMPTY_TUPLE_ITERATOR;
+        }
+
+        ITupleIterator ret = null;
+        if (cachedResult != null) {
+            Range<Long> reusePeriod = cachedResult.getReusableResults(ts);
+
+            logger.info("existing cache    : " + cachedResult);
+            logger.info("ts Range in query: " + RangeUtil.formatTsRange(ts));
+            logger.info("potential reusable range   : " + RangeUtil.formatTsRange(reusePeriod));
+
+            if (reusePeriod != null) {
+                List<Range<Long>> remainings = RangeUtil.remove(ts, reusePeriod);
+                if (remainings.size() == 1) {//if using cache causes two underlyingStorage searches, we'd rather not use the cache
+
+                    SimpleTupleIterator reusedTuples = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
+                    List<ITupleIterator> iTupleIteratorList = Lists.newArrayList();
+                    iTupleIteratorList.add(reusedTuples);
+
+                    for (Range<Long> remaining : remainings) {
+                        logger.info("Appending ts " + RangeUtil.formatTsRange(remaining) + " as additional filter");
+
+                        ITupleIterator freshTuples = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, remaining, new Function<Void, ITupleIterator>() {
+                            @Override
+                            public ITupleIterator apply(Void input) {
+                                return underlyingStorage.search(context, sqlDigest, returnTupleInfo);
+                            }
+                        });
+                        iTupleIteratorList.add(freshTuples);
+                    }
+
+                    ret = new CompoundTupleIterator(iTupleIteratorList);
+                } else if (remainings.size() == 0) {
+                    needUpdateCache = false;
+                    ret = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
+                } else {
+                    //if using cache causes two underlyingStorage searches, we'd rather not use the cache
+                }
+            }
+        } else {
+            logger.info("no cache entry for this query");
+        }
+
+        if (ret == null) {
+            logger.info("decision: not using cache");
+            ret = underlyingStorage.search(context, sqlDigest, returnTupleInfo);
+        } else {
+            logger.info("decision: use cache");
+        }
+
+        if (needUpdateCache || !queryCacheExists) {
+            //use another nested ITupleIterator to deal with cache
+            final TeeTupleIterator tee = new TeeTupleIterator(ret);
+            tee.addCloseListener(this);
+            return tee;
+        } else {
+            return ret;
+        }
+    }
+
+    @Override
+    public Range<Long> getVolatilePeriod() {
+        return underlyingStorage.getVolatilePeriod();
+    }
+
+    @Override
+    public boolean isDynamic() {
+        return true;
+    }
+
+    @Override
+    public void notify(List<ITuple> duplicated) {
+        Range<Long> cacheExclude = this.underlyingStorage.getVolatilePeriod();
+        if (cacheExclude != null) {
+            List<Range<Long>> cachablePeriods = RangeUtil.remove(ts, cacheExclude);
+            if (cachablePeriods.size() == 1) {
+                if (!ts.equals(cachablePeriods.get(0))) {
+                    logger.info("With respect to each shard's build status, the cacheable tsRange shrinks from " + RangeUtil.formatTsRange(ts) + " to " + RangeUtil.formatTsRange(cachablePeriods.get(0)));
+                }
+                ts = cachablePeriods.get(0);
+            } else {
+                //give up updating the cache, in avoid to make cache complicated
+            }
+        }
+
+        StreamSQLResult newCacheEntry = new StreamSQLResult(duplicated, ts, partitionColRef);
+        cacheManager.getCache(this.underlyingStorage.getClass().getName()).put(new Element(streamSQLDigest, newCacheEntry));
+        logger.info("cache after the query: " + newCacheEntry);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
new file mode 100644
index 0000000..a4aceed
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
@@ -0,0 +1,80 @@
+package org.apache.kylin.storage.cache;
+
+import com.google.common.collect.Range;
+import com.google.common.collect.Ranges;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.Element;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.StreamSQLDigest;
+import org.apache.kylin.metadata.tuple.*;
+import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 5/11/15.
+ */
+public class CacheFledgedStaticStorageEngine extends AbstractCacheFledgedStorageEngine implements IStorageEngine, TeeTupleItrListener {
+    private static final Logger logger = LoggerFactory.getLogger(CacheFledgedStaticStorageEngine.class);
+
+    public CacheFledgedStaticStorageEngine(IStorageEngine underlyingStorage) {
+        super(underlyingStorage);
+    }
+
+    @Override
+    public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
+
+        streamSQLDigest = new StreamSQLDigest(sqlDigest, null);
+        StreamSQLResult cachedResult = null;
+        Cache cache = cacheManager.getCache(this.underlyingStorage.getClass().getName());
+        Element element = cache.get(streamSQLDigest);
+        if (element != null) {
+            this.queryCacheExists = true;
+            cachedResult = (StreamSQLResult) element.getObjectValue();
+        }
+
+        ITupleIterator ret = null;
+        if (cachedResult != null) {
+            ret = new SimpleTupleIterator(cachedResult.reuse(Ranges.<Long> all()));
+        } else {
+            logger.info("no cache entry for this query");
+        }
+
+        if (ret == null) {
+            logger.info("decision: not using cache");
+            ret = underlyingStorage.search(context, sqlDigest, returnTupleInfo);
+        } else {
+            logger.info("decision: use cache");
+        }
+
+        if (queryCacheExists) {
+            //use another nested ITupleIterator to deal with cache
+            final TeeTupleIterator tee = new TeeTupleIterator(ret);
+            tee.addCloseListener(this);
+            return tee;
+        } else {
+            return ret;
+        }
+    }
+
+    @Override
+    public Range<Long> getVolatilePeriod() {
+        return underlyingStorage.getVolatilePeriod();
+    }
+
+    @Override
+    public boolean isDynamic() {
+        return false;
+    }
+
+    @Override
+    public void notify(List<ITuple> duplicated) {
+        StreamSQLResult newCacheEntry = new StreamSQLResult(duplicated, Ranges.<Long> all(), null);
+        cacheManager.getCache(this.underlyingStorage.getClass().getName()).put(new Element(streamSQLDigest, newCacheEntry));
+        logger.info("cache after the query: " + newCacheEntry);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
deleted file mode 100644
index 068619e..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
+++ /dev/null
@@ -1,186 +0,0 @@
-package org.apache.kylin.storage.cache;
-
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.Element;
-import net.sf.ehcache.config.CacheConfiguration;
-import net.sf.ehcache.config.MemoryUnit;
-import net.sf.ehcache.config.PersistenceConfiguration;
-import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.RangeUtil;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.realization.SQLDigestUtil;
-import org.apache.kylin.metadata.realization.StreamSQLDigest;
-import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.SimpleTupleIterator;
-import org.apache.kylin.metadata.tuple.TeeTupleIterator;
-import org.apache.kylin.storage.IStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.StorageEngineFactory;
-import org.apache.kylin.storage.hbase.coprocessor.endpoint.TsConditionExtractor;
-import org.apache.kylin.storage.tuple.TupleInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-
-/**
- */
-public class CacheFledgedStorageEngine implements IStorageEngine {
-
-    private static final Logger logger = LoggerFactory.getLogger(CacheFledgedStorageEngine.class);
-
-    public static final String STORAGE_LAYER_TUPLE_CACHE = "STORAGE_LAYER_TUPLE_CACHE";
-    //TODO: deal with failed queries
-
-    static CacheManager cacheManager;
-
-    static {
-        // TODO: L4J [2015-04-20 10:44:03,817][WARN][net.sf.ehcache.pool.sizeof.ObjectGraphWalker] - The configured limit of 1,000 object references was reached while attempting to calculate the size of the object graph. Severe performance degradation could occur if the sizing operation continues. This can be avoided by setting the CacheManger or Cache <sizeOfPolicy> elements maxDepthExceededBehavior to "abort" or adding stop points with @IgnoreSizeOf annotations. If performance degradation is NOT an issue at the configured limit, raise the limit value using the CacheManager or Cache <sizeOfPolicy
-        cacheManager = CacheManager.create();
-
-        //Create a Cache specifying its configuration.
-        Cache successCache = new Cache(new CacheConfiguration(STORAGE_LAYER_TUPLE_CACHE, 0).//
-                memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
-                eternal(false).//
-                timeToIdleSeconds(86400).//
-                diskExpiryThreadIntervalSeconds(0).//
-                maxBytesLocalHeap(1, MemoryUnit.GIGABYTES).//
-                persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
-
-        cacheManager.addCache(successCache);
-    }
-
-    private TblColRef partitionColRef;
-    private IRealization realization;
-
-    public CacheFledgedStorageEngine(IRealization realization) {
-        this.realization = realization;
-
-        Preconditions.checkArgument(realization.getType() != RealizationType.CUBE, "Cube realization does not need dynamic cache!");
-        String modelName = realization.getModelName();
-        DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
-        PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
-        Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
-        assert partitionDesc != null;
-        partitionColRef = partitionDesc.getPartitionDateColumnRef();
-        Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
-    }
-
-    @Override
-    public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
-
-        //enable storage layer cache iff ts column is contained in filter
-        boolean needUpdateCache = sqlDigest.groupbyColumns.contains(partitionColRef);
-
-        final StreamSQLDigest streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
-        StreamSQLResult cachedResult = null;
-        Cache cache = cacheManager.getCache(STORAGE_LAYER_TUPLE_CACHE);
-        Element element = cache.get(streamSQLDigest);
-        if (element != null) {
-            cachedResult = (StreamSQLResult) element.getObjectValue();
-        }
-
-        Range<Long> ts = TsConditionExtractor.extractTsCondition(partitionColRef, sqlDigest.filter);
-        if (ts == null || ts.isEmpty()) {
-            logger.info("ts range in the query conflicts,return empty directly");
-            return ITupleIterator.EMPTY_TUPLE_ITERATOR;
-        }
-
-        ITupleIterator ret = null;
-        if (cachedResult != null) {
-            logger.debug("existing cache    : " + cachedResult);
-            Range<Long> reusePeriod = cachedResult.getReusableResults(ts);
-
-            logger.info("ts Range in query: " + RangeUtil.formatTsRange(ts));
-            logger.info("potential reusable range   : " + RangeUtil.formatTsRange(reusePeriod));
-
-            if (reusePeriod != null) {
-
-                List<Range<Long>> remainings = RangeUtil.remove(ts, reusePeriod);
-                if (remainings.size() == 1) {
-
-                    SimpleTupleIterator reusedTuples = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
-                    Range<Long> remaining = remainings.get(0);
-                    logger.info("Appending ts " + RangeUtil.formatTsRange(remaining) + " as additional filter");
-                    ITupleIterator freshTuples = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, remaining, new Function<Void, ITupleIterator>() {
-                        @Override
-                        public ITupleIterator apply(Void input) {
-                            return StorageEngineFactory.getStorageEngine(realization, false).search(context, sqlDigest, returnTupleInfo);
-                        }
-                    });
-
-                    ret = new CompoundTupleIterator(Lists.newArrayList(reusedTuples, freshTuples));
-                } else if (remainings.size() == 0) {
-                    needUpdateCache = false;
-                    ret = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
-                }
-                //if remaining size > 1, we skip using cache , i.e, ret will == null
-            }
-        } else {
-            logger.info("no cache entry for this query");
-        }
-
-        if (ret == null) {
-            logger.info("decision: not using cache");
-            //cache cannot reuse case:
-            ret = StorageEngineFactory.getStorageEngine(realization, false).search(context, sqlDigest, returnTupleInfo);
-        } else {
-            logger.info("decision: use cache");
-        }
-
-        if (needUpdateCache) {
-            //the tsRange in cache should reflect data aliveness
-            final Range<Long> finalTs = ts;
-
-            //use another nested ITupleIterator to deal with cache
-            final TeeTupleIterator tee = new TeeTupleIterator(ret);
-            tee.setActionOnSeeingWholeData(new Function<List<ITuple>, Void>() {
-                @Nullable
-                @Override
-                public Void apply(List<ITuple> input) {
-                    Range<Long> tsRange = finalTs;
-                    Range<Long> cacheExclude = tee.getCacheExcludedPeriod();
-                    if (cacheExclude != null) {
-                        List<Range<Long>> cachablePeriods = RangeUtil.remove(tsRange, cacheExclude);
-                        if (cachablePeriods.size() == 1) {
-                            if (!tsRange.equals(cachablePeriods.get(0))) {
-                                logger.info("With respect to each shard's build status, the cacheable tsRange shrinks from " + RangeUtil.formatTsRange(tsRange) + " to " + RangeUtil.formatTsRange(cachablePeriods.get(0)));
-                            }
-                            tsRange = cachablePeriods.get(0);
-                        } else {
-                            //give up updating the cache, in avoid to make cache complicated
-                            return null;
-                        }
-                    }
-
-                    StreamSQLResult newCacheEntry = new StreamSQLResult(input, tsRange, partitionColRef);
-                    cacheManager.getCache(STORAGE_LAYER_TUPLE_CACHE).put(new Element(streamSQLDigest, newCacheEntry));
-                    logger.debug("cache after the query: " + newCacheEntry);
-                    return null;
-                }
-            });
-
-            return tee;
-        } else {
-            return ret;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/cache/StorageLayerCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/StorageLayerCache.java b/storage/src/main/java/org/apache/kylin/storage/cache/StorageLayerCache.java
deleted file mode 100644
index 9b4caa5..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/cache/StorageLayerCache.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.kylin.storage.cache;
-
-/**
- */
-public interface StorageLayerCache {
-    boolean isCacheEnabled();
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java b/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
index 5e58822..e895d8e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
@@ -1,22 +1,19 @@
 package org.apache.kylin.storage.cache;
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableMap;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Preconditions;
-import org.apache.kylin.common.util.RangeUtil;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.storage.tuple.Tuple;
-
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
+import org.apache.kylin.common.util.RangeUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.storage.tuple.Tuple;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableMap;
 
 /**
  */
@@ -27,20 +24,22 @@ public class StreamSQLResult {
 
     public StreamSQLResult(List<ITuple> rows, Range<Long> timeCovered, TblColRef partitionCol) {
 
-        Preconditions.checkArgument(timeCovered.hasUpperBound(),"StreamSQLResult requires timeCovered having a upperBound");
-
         sortedRows = Maps.newTreeMap();
         for (ITuple row : rows) {
 
-            long t = Tuple.getTs(row,partitionCol);
+            if (partitionCol != null) {
+                long t = Tuple.getTs(row, partitionCol);
 
-            //will only cache rows that are within the time range
-            if (timeCovered.contains(t)) {
-                if (!this.sortedRows.containsKey(t)) {
-                    this.sortedRows.put(t, Lists.newArrayList(row));
-                } else {
-                    this.sortedRows.get(t).add(row);
+                //will only cache rows that are within the time range
+                if (timeCovered.contains(t)) {
+                    if (!this.sortedRows.containsKey(t)) {
+                        this.sortedRows.put(t, Lists.newArrayList(row));
+                    } else {
+                        this.sortedRows.get(t).add(row);
+                    }
                 }
+            } else {
+                this.sortedRows.get(0L).add(row);
             }
         }
         this.timeCovered = timeCovered;
@@ -76,8 +75,7 @@ public class StreamSQLResult {
         return sortedRows.size() + " tuples cached for period " + RangeUtil.formatTsRange(timeCovered);
     }
 
-    public long getEndTime()
-    {
+    public long getEndTime() {
         return this.timeCovered.upperEndpoint();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
index 1633f25..a353f49 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
@@ -1,11 +1,8 @@
 package org.apache.kylin.storage.cube;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -31,8 +28,7 @@ import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.util.*;
 
 public class CubeStorageEngine implements IStorageEngine {
 
@@ -97,6 +93,16 @@ public class CubeStorageEngine implements IStorageEngine {
         return new SerializedCubeTupleIterator(scanners);
     }
 
+    @Override
+    public Range<Long> getVolatilePeriod() {
+        return null;
+    }
+
+    @Override
+    public boolean isDynamic() {
+        return false;
+    }
+
     private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
         for (FunctionDesc func : sqlDigest.aggregations) {
             if (!func.isDimensionAsMetric()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/cube/SerializedCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/SerializedCubeTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/cube/SerializedCubeTupleIterator.java
index e69a1c7..7b9286c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/SerializedCubeTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/SerializedCubeTupleIterator.java
@@ -1,11 +1,10 @@
 package org.apache.kylin.storage.cube;
 
-import java.util.List;
-
-import com.google.common.collect.Range;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 
+import java.util.List;
+
 public class SerializedCubeTupleIterator implements ITupleIterator {
 
     public SerializedCubeTupleIterator(List<CubeScanner> scanners) {
@@ -35,9 +34,5 @@ public class SerializedCubeTupleIterator implements ITupleIterator {
 
     }
 
-    @Override
-    public Range<Long> getCacheExcludedPeriod() {
-        return null;
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
index a9b4f92..a8027c0 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
@@ -18,23 +18,15 @@
 
 package org.apache.kylin.storage.hbase;
 
-import java.text.MessageFormat;
-import java.util.*;
-
 import com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.kylin.common.persistence.StorageException;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowValueDecoder;
@@ -49,7 +41,8 @@ import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Range;
+import java.text.MessageFormat;
+import java.util.*;
 
 /**
  * @author xjiang
@@ -106,10 +99,6 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         }
     }
 
-    @Override
-    public Range<Long> getCacheExcludedPeriod() {
-        return null;
-    }
 
     @Override
     public boolean hasNext() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
index ab75a3c..699cc41 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
@@ -18,44 +18,43 @@
 
 package org.apache.kylin.storage.hbase;
 
-import java.util.*;
-
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
-import org.apache.kylin.storage.tuple.TupleInfo;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.storage.StorageContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
-
+import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowValueDecoder;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseMappingDesc;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.storage.IStorageEngine;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
 
 /**
  * @author xjiang, yangli9
@@ -133,6 +132,16 @@ public class CubeStorageEngine implements IStorageEngine {
         return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
     }
 
+    @Override
+    public Range<Long> getVolatilePeriod() {
+        return null;
+    }
+
+    @Override
+    public boolean isDynamic() {
+        return false;
+    }
+
     private void buildDimensionsAndMetrics(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, SQLDigest sqlDigest) {
 
         for (FunctionDesc func : sqlDigest.aggregations) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
index 9259db6..c94222e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
@@ -18,8 +18,7 @@
 
 package org.apache.kylin.storage.hbase;
 
-import java.util.ArrayList;
-
+import com.google.common.collect.Range;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.invertedindex.IIInstance;
@@ -33,6 +32,8 @@ import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+
 /**
  * @author yangli9
  */
@@ -41,6 +42,7 @@ public class InvertedIndexStorageEngine implements IStorageEngine {
     private static Logger logger = LoggerFactory.getLogger(InvertedIndexStorageEngine.class);
 
     private IISegment seg;
+    private EndpointTupleIterator dataIterator;
 
     public InvertedIndexStorageEngine(IIInstance ii) {
         this.seg = ii.getFirstSegment();
@@ -54,10 +56,21 @@ public class InvertedIndexStorageEngine implements IStorageEngine {
         @SuppressWarnings("deprecation")
         HConnection conn = HBaseConnection.get(context.getConnUrl());
         try {
-            return new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo);
+            dataIterator = new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo);
+            return dataIterator;
         } catch (Throwable e) {
             logger.error("Error when connecting to II htable " + tableName, e);
             throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
         }
     }
+
+    @Override
+    public Range<Long> getVolatilePeriod() {
+        return dataIterator.getCacheExcludedPeriod();
+    }
+
+    @Override
+    public boolean isDynamic() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index 3bb8b80..9cc9eb2 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -149,8 +149,4 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
         }
     }
 
-    @Override
-    public Range<Long> getCacheExcludedPeriod() {
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index d9ede57..9cb5f36 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -19,10 +19,10 @@
 package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.*;
 import com.google.protobuf.ByteString;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -82,7 +82,7 @@ public class EndpointTupleIterator implements ITupleIterator {
     private HTableInterface table = null;
 
     private TblColRef partitionCol;
-    private long lastDataTime;
+    private long lastDataTime = -1;
     private int rowsInAllMetric = 0;
 
     public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn, TupleInfo returnTupleInfo) throws Throwable {
@@ -139,8 +139,7 @@ public class EndpointTupleIterator implements ITupleIterator {
 
         //decompress
         Collection<IIProtos.IIResponseInternal> shardResults = new ArrayList<>();
-        for(IIProtos.IIResponse input : compressedShardResults)
-        {
+        for (IIProtos.IIResponse input : compressedShardResults) {
             byte[] compressed = input.getBlob().toByteArray();
             try {
                 byte[] decompressed = CompressionUtils.decompress(compressed);
@@ -251,8 +250,14 @@ public class EndpointTupleIterator implements ITupleIterator {
         logger.info("Closed after " + rowsInAllMetric + " rows are fetched");
     }
 
-    @Override
+    /**
+     * tells storage layer cache what time period of data should not be cached.
+     * for static storage like cube, it will return null
+     * for dynamic storage like ii, it will for example exclude the last two minutes for possible data latency
+     * @return
+     */
     public Range<Long> getCacheExcludedPeriod() {
+        Preconditions.checkArgument(lastDataTime != -1, "lastDataTime is not set yet");
         return Ranges.greaterThan(lastDataTime);
     }
 
@@ -333,7 +338,7 @@ public class EndpointTupleIterator implements ITupleIterator {
             }
 
             index++;
-            
+
             return tupleConverter.makeTuple(this.tableRecord, this.measureValues, this.tuple);
         }
 
@@ -346,9 +351,5 @@ public class EndpointTupleIterator implements ITupleIterator {
         public void close() {
         }
 
-        @Override
-        public Range<Long> getCacheExcludedPeriod() {
-            throw new NotImplementedException();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
index 18155c6..27a919c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
@@ -1,13 +1,13 @@
 package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 
-import com.google.common.collect.Ranges;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.common.util.DateFormat;
 
 import com.google.common.collect.Range;
+import com.google.common.collect.Ranges;
 
 /**
  */
@@ -23,6 +23,9 @@ public class TsConditionExtractor {
     }
 
     private static Range<Long> extractTsConditionInternal(TupleFilter filter, TblColRef colRef) {
+        if (filter == null) {
+            return Ranges.all();
+        }
 
         if (filter instanceof LogicalTupleFilter) {
             if (filter.getOperator() == TupleFilter.FilterOperatorEnum.AND) {
@@ -41,6 +44,8 @@ public class TsConditionExtractor {
                 }
                 return ret.isEmpty() ? null : ret;
             } else {
+                //for conditions like date > DATE'2000-11-11' OR date < DATE '1999-01-01'
+                //we will use Ranges.all() rather than two ranges to represent them
                 return Ranges.all();
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
index 87ff26b..f9cea05 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
@@ -1,12 +1,14 @@
 package org.apache.kylin.storage.hybrid;
 
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Ranges;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.RangeUtil;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.realization.SQLDigestUtil;
 import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
@@ -16,25 +18,27 @@ import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.StorageEngineFactory;
 import org.apache.kylin.storage.tuple.TupleInfo;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Ranges;
+import javax.annotation.Nullable;
 
 /**
  */
 public class HybridStorageEngine implements IStorageEngine {
 
     private HybridInstance hybridInstance;
+    private IStorageEngine historicalStorageEngine;
+    private IStorageEngine realtimeStorageEngine;
 
     public HybridStorageEngine(HybridInstance hybridInstance) {
         this.hybridInstance = hybridInstance;
+        this.historicalStorageEngine = StorageEngineFactory.getStorageEngine(this.hybridInstance.getHistoryRealizationInstance());
+        this.realtimeStorageEngine = StorageEngineFactory.getStorageEngine(this.hybridInstance.getRealTimeRealizationInstance());
     }
 
     @Override
     public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
 
         // search the historic realization
-        ITupleIterator iterator1 = searchRealization(hybridInstance.getHistoryRealizationInstance(), context, sqlDigest, returnTupleInfo);
+        ITupleIterator historicalDataIterator = this.historicalStorageEngine.search(context, sqlDigest, returnTupleInfo);
 
         String modelName = hybridInstance.getModelName();
         MetadataManager metaMgr = getMetadataManager();
@@ -42,28 +46,32 @@ public class HybridStorageEngine implements IStorageEngine {
 
         // if the model isn't partitioned, only query the history
         if (modelDesc.getPartitionDesc() == null || modelDesc.getPartitionDesc().getPartitionDateColumnRef() == null)
-            return iterator1;
+            return historicalDataIterator;
 
         TblColRef partitionColRef = modelDesc.getPartitionDesc().getPartitionDateColumnRef();
 
-        ITupleIterator iterator2 = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, //
+        ITupleIterator realtimeDataIterator = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, //
                 Ranges.atLeast(hybridInstance.getHistoryRealizationInstance().getDateRangeEnd()),//
                 new Function<Void, ITupleIterator>() {
                     @Nullable
                     @Override
                     public ITupleIterator apply(@Nullable Void input) {
-                        ITupleIterator iterator2 = searchRealization(hybridInstance.getRealTimeRealizationInstance(), context, sqlDigest, returnTupleInfo);
-                        return iterator2;
+                        return realtimeStorageEngine.search(context, sqlDigest, returnTupleInfo);
                     }
                 });
 
         // combine history and real-time tuple iterator
-        return new CompoundTupleIterator(Lists.newArrayList(iterator1, iterator2));
+        return new CompoundTupleIterator(Lists.newArrayList(historicalDataIterator, realtimeDataIterator));
+    }
+
+    @Override
+    public Range<Long> getVolatilePeriod() {
+        return RangeUtil.merge(historicalStorageEngine.getVolatilePeriod(), realtimeStorageEngine.getVolatilePeriod());
     }
 
-    private ITupleIterator searchRealization(IRealization realization, StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        IStorageEngine storageEngine = StorageEngineFactory.getStorageEngine(realization, false);
-        return storageEngine.search(context, sqlDigest, returnTupleInfo);
+    @Override
+    public boolean isDynamic() {
+        return true;
     }
 
     private MetadataManager getMetadataManager() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bc426cc0/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java b/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java
index 2a258df..fece398 100644
--- a/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/test/StorageTest.java
@@ -18,12 +18,7 @@
 
 package org.apache.kylin.storage.test;
 
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
+import com.google.common.collect.ImmutableList;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
@@ -41,7 +36,11 @@ import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
 import org.apache.kylin.storage.tuple.TupleInfo;
 import org.junit.*;
 
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
 
 public class StorageTest extends HBaseMetadataTestCase {
 
@@ -64,7 +63,7 @@ public class StorageTest extends HBaseMetadataTestCase {
         CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
         cube = cubeMgr.getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_EMPTY");
         Assert.assertNotNull(cube);
-        storageEngine = StorageEngineFactory.getStorageEngine(cube, false);
+        storageEngine = StorageEngineFactory.getStorageEngine(cube);
         String url = KylinConfig.getInstanceFromEnv().getStorageUrl();
         context = new StorageContext();
         context.setConnUrl(url);