You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/01 11:36:18 UTC

incubator-kylin git commit: KYLIN-801 fix remaining issues on query cache and storage cache

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 33a88bcb3 -> c7495de91


KYLIN-801 fix remaining issues on query cache and storage cache


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c7495de9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c7495de9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c7495de9

Branch: refs/heads/0.8.0
Commit: c7495de919ae44bac00594df13473be697030fc8
Parents: 33a88bc
Author: honma <ho...@ebay.com>
Authored: Mon Jun 1 14:14:25 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Mon Jun 1 17:33:49 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/cache/CacheUpdater.java | 11 +++
 .../kylin/common/cache/LocalCacheUpdater.java   | 18 +++++
 .../kylin/common/cache/RemoteCacheUpdater.java  | 14 ++++
 .../common/restclient/AbstractRestCache.java    | 12 ++-
 .../kylin/common/restclient/Broadcaster.java    |  4 +-
 .../common/restclient/SingleValueCache.java     | 16 ++--
 .../java/org/apache/kylin/cube/CubeManager.java | 31 ++++----
 jdbc/.gitignore                                 |  1 +
 .../job/cube/UpdateCubeInfoAfterBuildStep.java  |  4 +-
 .../job/cube/UpdateCubeInfoAfterMergeStep.java  | 24 +++---
 .../job/streaming/CubeStreamBuilderTest.java    |  1 -
 .../kylin/metadata/tuple/TeeTupleIterator.java  | 11 ++-
 .../metadata/tuple/TeeTupleItrListener.java     |  2 +-
 .../apache/kylin/rest/constant/Constant.java    |  8 +-
 .../kylin/rest/controller/QueryController.java  | 78 +++++++++-----------
 .../apache/kylin/rest/service/BasicService.java | 26 +++----
 .../apache/kylin/rest/service/CacheService.java | 44 ++++++-----
 .../apache/kylin/rest/service/CubeService.java  | 15 ++--
 .../apache/kylin/rest/service/JobService.java   |  2 +-
 .../apache/kylin/rest/service/ModelService.java |  1 -
 .../src/main/resources/applicationContext.xml   |  6 ++
 server/src/main/resources/ehcache-test.xml      | 44 +++++------
 server/src/main/resources/ehcache.xml           | 44 +++++------
 .../kylin/rest/service/CacheServiceTest.java    | 30 +++++---
 .../kylin/storage/StorageEngineFactory.java     |  2 +-
 .../AbstractCacheFledgedStorageEngine.java      | 66 ++++++++++++-----
 .../cache/CacheFledgedDynamicStorageEngine.java | 11 +--
 .../cache/CacheFledgedStaticStorageEngine.java  | 29 ++++++--
 .../kylin/storage/test/StaticCacheTest.java     | 17 ++---
 29 files changed, 334 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java b/common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
new file mode 100644
index 0000000..85fdf8f
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
@@ -0,0 +1,11 @@
+package org.apache.kylin.common.cache;
+
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.restclient.Broadcaster;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 6/1/15.
+ */
+public interface CacheUpdater {
+    void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java b/common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
new file mode 100644
index 0000000..647623d
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
@@ -0,0 +1,18 @@
+package org.apache.kylin.common.cache;
+
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.restclient.Broadcaster;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 6/1/15.
+ */
+public class LocalCacheUpdater implements CacheUpdater {
+    @Override
+    public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
+        if (syncAction == Broadcaster.EVENT.CREATE || syncAction == Broadcaster.EVENT.UPDATE) {
+            cache.putLocal(key, value);
+        } else if (syncAction == Broadcaster.EVENT.DROP) {
+            cache.removeLocal(key);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java b/common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
new file mode 100644
index 0000000..8419f75
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
@@ -0,0 +1,14 @@
+package org.apache.kylin.common.cache;
+
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.restclient.Broadcaster;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 6/1/15.
+ */
+public class RemoteCacheUpdater implements CacheUpdater {
+    @Override
+    public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
+        Broadcaster.getInstance().queue(type.getType(), syncAction.getType(), key.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java b/common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
index c728ff2..043a3b4 100644
--- a/common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
+++ b/common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
@@ -18,12 +18,20 @@
 
 package org.apache.kylin.common.restclient;
 
+import org.apache.kylin.common.cache.CacheUpdater;
+import org.apache.kylin.common.cache.LocalCacheUpdater;
+
 /**
  * @author xjiang
  * 
  */
 public abstract class AbstractRestCache<K, V> {
 
+    protected static CacheUpdater cacheUpdater = new LocalCacheUpdater();
+
+    public static void setCacheUpdater(CacheUpdater cu) {
+        cacheUpdater = cu;
+    }
 
     protected final Broadcaster.TYPE syncType;
 
@@ -31,10 +39,6 @@ public abstract class AbstractRestCache<K, V> {
         this.syncType = syncType;
     }
 
-    protected final void syncRemote(K key, Broadcaster.EVENT syncAction) {
-        Broadcaster.getInstance().queue(syncType.getType(), syncAction.getType(), key.toString());
-    }
-
     public abstract void put(K key, V value);
 
     public abstract void putLocal(K key, V value);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
index 065b64b..03a0d34 100644
--- a/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ b/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
@@ -20,8 +20,8 @@ package org.apache.kylin.common.restclient;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,7 +136,7 @@ public class Broadcaster {
     }
 
     public enum TYPE {
-        ALL("all"), CUBE("cube"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
+        CUBE("cube"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
         private String text;
 
         TYPE(String text) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java b/common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
index 00f8a3e..0323f11 100644
--- a/common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
+++ b/common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
@@ -44,14 +44,11 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
 
     public void put(K key, V value) {
         boolean exists = innerCache.containsKey(key);
-        //The put operation will be duplicated when REST request is received.
-        //It is intended so because many test cases does not have REST env
-        innerCache.put(key, value);
 
         if (!exists) {
-            syncRemote(key, Broadcaster.EVENT.CREATE);
+            cacheUpdater.updateCache(key, value, Broadcaster.EVENT.CREATE, syncType, this);
         } else {
-            syncRemote(key, Broadcaster.EVENT.UPDATE);
+            cacheUpdater.updateCache(key, value, Broadcaster.EVENT.UPDATE, syncType, this);
         }
     }
 
@@ -61,15 +58,12 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
 
     public void remove(K key) {
         if (innerCache.containsKey(key)) {
-            //The remove operation will be duplicated when REST request is received.
-            //It is intended so because many test cases does not have REST env
-            innerCache.remove(key);
-            syncRemote(key, Broadcaster.EVENT.DROP);
+            cacheUpdater.updateCache(key,null,Broadcaster.EVENT.DROP,syncType,this);
         }
     }
 
     public void removeLocal(K key) {
-        innerCache.remove(key);
+         innerCache.remove(key);
     }
 
     public void clear() {
@@ -98,5 +92,5 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
 
     public Set<K> keySet() {
         return innerCache.keySet();
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 812457a..5c651fc 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -208,19 +208,16 @@ public class CubeManager implements IRealizationProvider {
             CubeDescManager.getInstance(config).removeCubeDesc(cube.getDescriptor());
         }
 
-        removeCube(cube);
+        // remove cube and update cache
+        getStore().deleteResource(cube.getResourcePath());
+        cubeMap.remove(cube.getName());
+
         // delete cube from project
         ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName);
 
         return cube;
     }
 
-    private void removeCube(CubeInstance cube) throws IOException {
-        // remove cube and update cache
-        getStore().deleteResource(cube.getResourcePath());
-        cubeMap.remove(cube.getName());
-    }
-
     // sync on update
     public CubeInstance createCube(String cubeName, String projectName, CubeDesc desc, String owner) throws IOException {
         logger.info("Creating cube '" + projectName + "-->" + cubeName + "' from desc '" + desc.getName() + "'");
@@ -236,7 +233,15 @@ public class CubeManager implements IRealizationProvider {
     }
 
     /**
-     * if not sure whether to enable updateProject, just use it
+     *
+     * @param cube
+     * @param updateProject Updating project is necessary when you want the project's JDBC connection
+     *                      reflect the content in the cube. So basically you only set it to true when
+     *                      a cube first turns into READY, or when the cube's status changed from READY
+     *                      to un ready.
+     *                      if not sure whether to enable updateProject, just use it
+     * @return
+     * @throws IOException
      */
     public CubeInstance updateCube(CubeInstance cube, boolean updateProject) throws IOException {
 
@@ -396,15 +401,13 @@ public class CubeManager implements IRealizationProvider {
         try {
             return reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName));
         } catch (IOException e) {
-            logger.error(e.getLocalizedMessage(), e);
+            throw new RuntimeException(e);
         }
-
-        return null;
     }
 
     public void removeCubeLocal(String cubeName) {
-        cubeMap.removeLocal(cubeName);
         usedStorageLocation.removeAll(cubeName.toUpperCase());
+        cubeMap.removeLocal(cubeName);
     }
 
     public LookupStringTable getLookupTable(CubeSegment cubeSegment, DimensionDesc dim) {
@@ -515,7 +518,7 @@ public class CubeManager implements IRealizationProvider {
         return null;
     }
 
-    public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment... newSegments) throws IOException {
+    public void promoteNewlyBuiltSegments(CubeInstance cube, boolean updateProj, CubeSegment... newSegments) throws IOException {
         List<CubeSegment> tobe = calculateToBeSegments(cube);
 
         for (CubeSegment seg : newSegments) {
@@ -540,7 +543,7 @@ public class CubeManager implements IRealizationProvider {
         cube.setStatus(RealizationStatusEnum.READY);
 
         logger.info("Promoting cube " + cube + ", new segments " + newSegments);
-        updateCube(cube, true);
+        updateCube(cube, updateProj);
     }
 
     public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/jdbc/.gitignore
----------------------------------------------------------------------
diff --git a/jdbc/.gitignore b/jdbc/.gitignore
new file mode 100644
index 0000000..c78fc40
--- /dev/null
+++ b/jdbc/.gitignore
@@ -0,0 +1 @@
+kylin_jdbc.log*

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
index 8140003..1472424 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
@@ -125,9 +125,9 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
 
         try {
             if (segmentReady) {
-                cubeManager.promoteNewlyBuiltSegments(cube, segment);
+                cubeManager.promoteNewlyBuiltSegments(cube, cube.getSegments().size() == 1, segment);
             } else {
-                cubeManager.updateCube(cube, true);
+                cubeManager.updateCube(cube, false);
             }
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
index 1836e17..d7b07ac 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
@@ -18,15 +18,9 @@
 
 package org.apache.kylin.job.cube;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -38,6 +32,11 @@ import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 /**
  */
 public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
@@ -57,7 +56,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         final CubeInstance cube = cubeManager.getCube(getCubeName());
-        
+
         CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
         if (mergedSegment == null) {
             return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
@@ -78,17 +77,16 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
             sourceCount += segment.getInputRecords();
             sourceSize += segment.getInputRecordsSize();
         }
-        
+
         // update segment info
         mergedSegment.setSizeKB(cubeSize);
         mergedSegment.setInputRecords(sourceCount);
         mergedSegment.setInputRecordsSize(sourceSize);
         mergedSegment.setLastBuildJobID(getCubingJobId());
         mergedSegment.setLastBuildTime(System.currentTimeMillis());
-        mergedSegment.setStatus(SegmentStatusEnum.READY);
-        
+
         try {
-            cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
+            cubeManager.promoteNewlyBuiltSegments(cube, false, mergedSegment);
             return new ExecuteResult(ExecuteResult.State.SUCCEED);
         } catch (IOException e) {
             logger.error("fail to update cube after merge", e);
@@ -121,7 +119,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         if (ids != null) {
             final String[] splitted = StringUtils.split(ids, ",");
             ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id: splitted) {
+            for (String id : splitted) {
                 result.add(id);
             }
             return result;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
index f6babeb..f0ef5e1 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
@@ -74,5 +74,4 @@ public class CubeStreamBuilderTest {
         }
         queue.put(StreamMessage.EOF);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
index 0618b45..43b469c 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
@@ -1,30 +1,37 @@
 package org.apache.kylin.metadata.tuple;
 
 import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
 /**
- *
  * Like "tee" command in linux, it effectively duplicates the underlying
  * ITupleIterator's results
  */
 public class TeeTupleIterator implements ITupleIterator {
 
+    private static final Logger logger = LoggerFactory.getLogger(TeeTupleIterator.class);
+
     private ITupleIterator underlying;
     private List<ITuple> duplicatedData;
     private List<TeeTupleItrListener> listeners = Lists.newArrayList();
+    private long createTime;
 
     public TeeTupleIterator(ITupleIterator underlying) {
         this.underlying = underlying;
         this.duplicatedData = Lists.newArrayList();
+        this.createTime = System.currentTimeMillis();
     }
 
     @Override
     public void close() {
         this.underlying.close();
+
+
         for (TeeTupleItrListener listener : this.listeners) {
-            listener.notify(this.duplicatedData);
+            listener.notify(this.duplicatedData,this.createTime);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleItrListener.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleItrListener.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleItrListener.java
index 50b94bd..5bf86df 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleItrListener.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleItrListener.java
@@ -6,5 +6,5 @@ import java.util.List;
  * Created by Hongbin Ma(Binmahone) on 5/13/15.
  */
 public interface TeeTupleItrListener {
-    void notify(List<ITuple> duplicated);
+    void notify(List<ITuple> duplicated,long createTime);
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/constant/Constant.java b/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
index 5a2f8e1..f068e5f 100644
--- a/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
+++ b/server/src/main/java/org/apache/kylin/rest/constant/Constant.java
@@ -24,19 +24,15 @@ package org.apache.kylin.rest.constant;
  */
 public class Constant {
 
-    // @hardcode
     public final static String FakeSchemaName = "defaultSchema";
-
-    // @hardcode
     public final static String FakeCatalogName = "defaultCatalog";
 
     public final static String IDENTITY_USER = "user";
-
     public final static String IDENTITY_ROLE = "role";
 
     public final static String ROLE_ADMIN = "ROLE_ADMIN";
-    public static final String ROLE_MODELER = "ROLE_MODELER";
-    public static final String ROLE_ANALYST = "ROLE_ANALYST";
+    public final static String ROLE_MODELER = "ROLE_MODELER";
+    public final static String ROLE_ANALYST = "ROLE_ANALYST";
 
     public final static String ACCESS_HAS_ROLE_ADMIN = "hasRole('ROLE_ADMIN')";
     public final static String ACCESS_HAS_ROLE_MODELER = "hasRole('ROLE_MODELER')";

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index d13efa8..e3cfc95 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -18,50 +18,47 @@
 
 package org.apache.kylin.rest.controller;
 
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.servlet.http.HttpServletResponse;
-
+import com.codahale.metrics.annotation.Timed;
+import com.google.common.base.Preconditions;
 import net.sf.ehcache.Cache;
 import net.sf.ehcache.CacheManager;
 import net.sf.ehcache.Element;
-
 import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.model.Query;
+import org.apache.kylin.rest.model.SelectedColumnMeta;
+import org.apache.kylin.rest.model.TableMeta;
 import org.apache.kylin.rest.request.MetaRequest;
+import org.apache.kylin.rest.request.PrepareSqlRequest;
+import org.apache.kylin.rest.request.SQLRequest;
+import org.apache.kylin.rest.request.SaveSqlRequest;
 import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.service.QueryService;
+import org.apache.kylin.rest.util.QueryUtil;
+import org.apache.kylin.storage.cache.AbstractCacheFledgedStorageEngine;
+import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.access.AccessDeniedException;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Controller;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.*;
 import org.supercsv.io.CsvListWriter;
 import org.supercsv.io.ICsvListWriter;
 import org.supercsv.prefs.CsvPreference;
 
-import com.codahale.metrics.annotation.Timed;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.rest.constant.Constant;
-import org.apache.kylin.rest.model.Query;
-import org.apache.kylin.rest.model.SelectedColumnMeta;
-import org.apache.kylin.rest.model.TableMeta;
-import org.apache.kylin.rest.request.PrepareSqlRequest;
-import org.apache.kylin.rest.request.SQLRequest;
-import org.apache.kylin.rest.request.SaveSqlRequest;
-import org.apache.kylin.rest.service.QueryService;
-import org.apache.kylin.rest.util.QueryUtil;
+import javax.annotation.PostConstruct;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 
 /**
  * Handle query requests.
@@ -73,7 +70,6 @@ public class QueryController extends BasicController {
 
     private static final Logger logger = LoggerFactory.getLogger(QueryController.class);
 
-    public static final String SUCCESS_QUERY_CACHE = "SuccessQueryCache";
     public static final String EXCEPTION_QUERY_CACHE = "ExceptionQueryCache";
 
     @Autowired
@@ -82,6 +78,12 @@ public class QueryController extends BasicController {
     @Autowired
     private CacheManager cacheManager;
 
+    @PostConstruct
+    public void init() throws IOException {
+        Preconditions.checkNotNull(cacheManager, "cacheManager is not injected yet");
+        AbstractCacheFledgedStorageEngine.setCacheManager(cacheManager);
+    }
+
     @RequestMapping(value = "/query", method = RequestMethod.POST)
     @ResponseBody
     @Timed(name = "query")
@@ -202,47 +204,39 @@ public class QueryController extends BasicController {
         SQLResponse sqlResponse = searchQueryInCache(sqlRequest);
         try {
             if (null == sqlResponse) {
-                long startTimestamp = System.currentTimeMillis();
                 sqlResponse = queryService.query(sqlRequest);
-                long queryRealExecutionTime = System.currentTimeMillis() - startTimestamp;
-
-                long durationThreshold = KylinConfig.getInstanceFromEnv().getQueryDurationCacheThreshold();
-                long scancountThreshold = KylinConfig.getInstanceFromEnv().getQueryScanCountCacheThreshold();
-                if (!sqlResponse.getIsException() && (queryRealExecutionTime > durationThreshold || sqlResponse.getTotalScanCount() > scancountThreshold)) {
-                    cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest, sqlResponse));
-                }
             }
 
             checkQueryAuth(sqlResponse);
-
             return sqlResponse;
         } catch (AccessDeniedException ade) {
             // Access exception is bind with each user, it will not be cached
             logger.error("Exception when execute sql", ade);
             throw new ForbiddenException(ade.getLocalizedMessage());
-        } catch (Throwable e) { // calcite may throw AssertError
+        } catch (ScanOutOfLimitException e) {
+
+            //for exception queries, only cache ScanOutOfLimitException
             SQLResponse exceptionRes = new SQLResponse(null, null, 0, true, e.getMessage());
             Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
             exceptionCache.put(new Element(sqlRequest, exceptionRes));
 
             logger.error("Exception when execute sql", e);
             throw new InternalErrorException(QueryUtil.makeErrorMsgUserFriendly(e.getLocalizedMessage()));
+
+        } catch (Throwable e) { // calcite may throw AssertError
+            logger.error("Exception when execute sql", e);
+            throw new InternalErrorException(QueryUtil.makeErrorMsgUserFriendly(e.getLocalizedMessage()));
         }
     }
 
     private SQLResponse searchQueryInCache(SQLRequest sqlRequest) {
         SQLResponse response = null;
         Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
-        Cache queryCache = cacheManager.getCache(SUCCESS_QUERY_CACHE);
 
         if (KylinConfig.getInstanceFromEnv().isQueryCacheEnabled() && null != exceptionCache.get(sqlRequest)) {
             Element element = exceptionCache.get(sqlRequest);
             response = (SQLResponse) element.getObjectValue();
             response.setHitCache(true);
-        } else if (KylinConfig.getInstanceFromEnv().isQueryCacheEnabled() && null != queryCache.get(sqlRequest)) {
-            Element element = queryCache.get(sqlRequest);
-            response = (SQLResponse) element.getObjectValue();
-            response.setHitCache(true);
         }
 
         return response;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 0ae9376..c9eb150 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -23,6 +23,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
+import net.sf.ehcache.CacheManager;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeDescManager;
@@ -40,11 +41,9 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
-import org.apache.kylin.rest.controller.QueryController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.cache.annotation.CacheEvict;
-import org.springframework.cache.annotation.Caching;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.jdbc.datasource.DriverManagerDataSource;
 
 import javax.sql.DataSource;
@@ -68,6 +67,9 @@ public abstract class BasicService {
 
     private static ConcurrentMap<String, DataSource> olapDataSources = new ConcurrentHashMap<String, DataSource>();
 
+    @Autowired
+    private CacheManager cacheManager;
+
     //    @Autowired
     //    protected JdbcTemplate jdbcTemplate;
 
@@ -75,6 +77,12 @@ public abstract class BasicService {
         return KylinConfig.getInstanceFromEnv();
     }
 
+    protected void cleanDataCache(String storageUUID) {
+        if (cacheManager != null && cacheManager.getCache(storageUUID) != null) {
+            cacheManager.getCache(storageUUID).removeAll();
+        }
+    }
+
     public void removeOLAPDataSource(String project) {
         logger.info("removeOLAPDataSource is called for project " + project);
         if (StringUtils.isEmpty(project))
@@ -125,18 +133,6 @@ public abstract class BasicService {
         return ret;
     }
 
-    /**
-     * Reload changed cube into cache
-     * 
-     * @throws IOException
-     */
-    @Caching(evict = { @CacheEvict(value = QueryController.SUCCESS_QUERY_CACHE, allEntries = true), @CacheEvict(value = QueryController.EXCEPTION_QUERY_CACHE, allEntries = true) })
-    public void cleanDataCache() {
-        CubeManager.clearCache();
-        ProjectManager.clearCache();
-        removeAllOLAPDataSources();
-    }
-
     public final KylinConfig getKylinConfig() {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 9b89932..da710d4 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -18,28 +18,27 @@
 
 package org.apache.kylin.rest.service;
 
+import com.google.common.base.Preconditions;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.cache.CacheUpdater;
+import org.apache.kylin.common.restclient.AbstractRestCache;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.invertedindex.IIDescManager;
-import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.job.cube.CubingJob;
 import org.apache.kylin.job.cube.CubingJobBuilder;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.rest.constant.Constant;
-import org.apache.kylin.rest.controller.QueryController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cache.annotation.CacheEvict;
-import org.springframework.cache.annotation.Caching;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import java.io.IOException;
 
 /**
@@ -47,18 +46,30 @@ import java.io.IOException;
 @Component("cacheService")
 public class CacheService extends BasicService {
 
-    private static final Logger logger = LoggerFactory.getLogger(CacheService.class);
-
     @Autowired
-    private JobService jobService;
+    private CacheUpdater cacheUpdater;
+
+    @PostConstruct
+    public void init() throws IOException {
+        initCacheUpdater(cacheUpdater);
+    }
+
+    public void initCacheUpdater(CacheUpdater cacheUpdater) {
+        Preconditions.checkNotNull(cacheUpdater, "cacheManager is not injected yet");
+        AbstractRestCache.setCacheUpdater(cacheUpdater);
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(CacheService.class);
 
-    @Caching(evict = { @CacheEvict(value = QueryController.SUCCESS_QUERY_CACHE, allEntries = true), @CacheEvict(value = QueryController.EXCEPTION_QUERY_CACHE, allEntries = true) })
     public void rebuildCache(Broadcaster.TYPE cacheType, String cacheKey) {
         final String log = "rebuild cache type: " + cacheType + " name:" + cacheKey;
         try {
             switch (cacheType) {
             case CUBE:
-                getCubeManager().reloadCubeLocal(cacheKey);
+                CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
+                //clean query related cache first
+                super.cleanDataCache(newCube.getUuid());
+                //move this logic to other place
                 mergeCubeOnNewSegmentReady(cacheKey);
                 break;
             case CUBE_DESC:
@@ -69,6 +80,7 @@ public class CacheService extends BasicService {
                 removeOLAPDataSource(projectInstance.getName());
                 break;
             case INVERTED_INDEX:
+                //II update does not need to update storage cache because it is dynamic already
                 getIIManager().reloadIILocal(cacheKey);
                 break;
             case INVERTED_INDEX_DESC:
@@ -84,15 +96,6 @@ public class CacheService extends BasicService {
                 IIDescManager.clearCache();
                 CubeDescManager.clearCache();
                 break;
-            case ALL:
-                getMetadataManager().reload();
-                CubeDescManager.clearCache();
-                CubeManager.clearCache();
-                IIDescManager.clearCache();
-                IIManager.clearCache();
-                ProjectManager.clearCache();
-                removeAllOLAPDataSources();
-                break;
             default:
                 throw new RuntimeException("invalid cacheType:" + cacheType);
             }
@@ -101,13 +104,14 @@ public class CacheService extends BasicService {
         }
     }
 
-    @Caching(evict = { @CacheEvict(value = QueryController.SUCCESS_QUERY_CACHE, allEntries = true), @CacheEvict(value = QueryController.EXCEPTION_QUERY_CACHE, allEntries = true) })
     public void removeCache(Broadcaster.TYPE cacheType, String cacheKey) {
         final String log = "remove cache type: " + cacheType + " name:" + cacheKey;
         try {
             switch (cacheType) {
             case CUBE:
+                String storageUUID = getCubeManager().getCube(cacheKey).getUuid();
                 getCubeManager().removeCubeLocal(cacheKey);
+                super.cleanDataCache(storageUUID);
                 break;
             case CUBE_DESC:
                 getCubeDescManager().removeLocalCubeDesc(cacheKey);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index ba62bb2..08e4112 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -46,7 +46,6 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.metadata.util.HiveSourceTableLoader;
 import org.apache.kylin.rest.constant.Constant;
-import org.apache.kylin.rest.controller.QueryController;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.MetricsRequest;
 import org.apache.kylin.rest.response.HBaseResponse;
@@ -55,8 +54,6 @@ import org.apache.kylin.rest.security.AclPermission;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cache.annotation.CacheEvict;
-import org.springframework.cache.annotation.Caching;
 import org.springframework.security.access.prepost.PostFilter;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.security.core.context.SecurityContextHolder;
@@ -234,6 +231,7 @@ public class CubeService extends BasicService {
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
     public CubeDesc updateCubeAndDesc(CubeInstance cube, CubeDesc desc, String newProjectName) throws IOException, JobException {
+
         final List<CubingJob> cubingJobs = listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING));
         if (!cubingJobs.isEmpty()) {
             throw new JobException("Cube schema shouldn't be changed with running job.");
@@ -307,10 +305,12 @@ public class CubeService extends BasicService {
      * @throws JobException
      */
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
-    @Caching(evict = { @CacheEvict(value = QueryController.SUCCESS_QUERY_CACHE, allEntries = true), @CacheEvict(value = QueryController.EXCEPTION_QUERY_CACHE, allEntries = true) })
     public CubeInstance purgeCube(CubeInstance cube) throws IOException, JobException {
-        String cubeName = cube.getName();
 
+        //clean query related cache first
+        super.cleanDataCache(cube.getUuid());
+
+        String cubeName = cube.getName();
         RealizationStatusEnum ostatus = cube.getStatus();
         if (null != ostatus && !RealizationStatusEnum.DISABLED.equals(ostatus)) {
             throw new InternalErrorException("Only disabled cube can be purged, status of " + cubeName + " is " + ostatus);
@@ -333,8 +333,11 @@ public class CubeService extends BasicService {
      * @throws JobException
      */
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
-    @Caching(evict = { @CacheEvict(value = QueryController.SUCCESS_QUERY_CACHE, allEntries = true), @CacheEvict(value = QueryController.EXCEPTION_QUERY_CACHE, allEntries = true) })
     public CubeInstance disableCube(CubeInstance cube) throws IOException, JobException {
+
+        //clean query related cache first
+        super.cleanDataCache(cube.getUuid());
+
         String cubeName = cube.getName();
 
         RealizationStatusEnum ostatus = cube.getStatus();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 13d51a9..3c150ef 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -264,7 +264,7 @@ public class JobService extends BasicService {
         final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
         if (segment != null && segment.getStatus() == SegmentStatusEnum.NEW) {
             cubeInstance.getSegments().remove(segment);
-            getCubeManager().updateCube(cubeInstance, true);
+            getCubeManager().updateCube(cubeInstance, false);
         }
         getExecutableManager().discardJob(jobId);
         return jobInstance;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/server/src/main/java/org/apache/kylin/rest/service/ModelService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/ModelService.java b/server/src/main/java/org/apache/kylin/rest/service/ModelService.java
index 11f2ca0..68a05fb 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -36,7 +36,6 @@ import org.springframework.stereotype.Component;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/server/src/main/resources/applicationContext.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/applicationContext.xml b/server/src/main/resources/applicationContext.xml
index aef311e..b2f5387 100644
--- a/server/src/main/resources/applicationContext.xml
+++ b/server/src/main/resources/applicationContext.xml
@@ -112,17 +112,21 @@
     <!-- Cache Config -->
     <cache:annotation-driven/>
 
+
+
     <bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager"
           p:cacheManager-ref="ehcache"/>
     <beans profile="default">
         <bean id="ehcache"
               class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
               p:configLocation="classpath:ehcache.xml" p:shared="true"/>
+        <bean id="cacheUpdater" class="org.apache.kylin.common.cache.RemoteCacheUpdater"/>
     </beans>
     <beans profile="sandbox,testing">
         <bean id="ehcache"
               class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
               p:configLocation="classpath:ehcache-test.xml" p:shared="true"/>
+        <bean id="cacheUpdater" class="org.apache.kylin.common.cache.LocalCacheUpdater"/>
     </beans>
 
     <!-- hbase storage/global lock Config -->
@@ -135,4 +139,6 @@
         <bean id="jobLock" class="org.apache.kylin.common.lock.MockJobLock"/>
     </beans>
 
+
+
 </beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/server/src/main/resources/ehcache-test.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/ehcache-test.xml b/server/src/main/resources/ehcache-test.xml
index a0c33f5..43940e0 100644
--- a/server/src/main/resources/ehcache-test.xml
+++ b/server/src/main/resources/ehcache-test.xml
@@ -1,23 +1,23 @@
-<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://ehcache.org/ehcache.xsd">
-<cache name="SuccessQueryCache" 
-    eternal="false" 
-    overflowToDisk="false" 
-    timeToIdleSeconds="86400"
-    memoryStoreEvictionPolicy="LRU"
-    maxBytesLocalHeap="1M"
-/>
-<cache name="ExceptionQueryCache" 
-    eternal="false" 
-    overflowToDisk="false" 
-    timeToIdleSeconds="86400"
-    memoryStoreEvictionPolicy="LRU"
-    maxBytesLocalHeap="1M"
-/>
-<cache name="UserCache" 
-    eternal="false" 
-    overflowToDisk="false" 
-    timeToLiveSeconds="10800"
-    memoryStoreEvictionPolicy="LRU"
-    maxBytesLocalHeap="1M"
-/>
+<ehcache maxBytesLocalHeap="10M">>
+    <cache name="StorageCache"
+           eternal="false"
+           overflowToDisk="false"
+           timeToIdleSeconds="86400"
+           memoryStoreEvictionPolicy="LRU"
+           maxBytesLocalHeap="1M"
+            />
+    <cache name="ExceptionQueryCache"
+           eternal="false"
+           overflowToDisk="false"
+           timeToIdleSeconds="86400"
+           memoryStoreEvictionPolicy="LRU"
+           maxBytesLocalHeap="1M"
+            />
+    <cache name="UserCache"
+           eternal="false"
+           overflowToDisk="false"
+           timeToLiveSeconds="10800"
+           memoryStoreEvictionPolicy="LRU"
+           maxBytesLocalHeap="1M"
+            />
 </ehcache>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/server/src/main/resources/ehcache.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/ehcache.xml b/server/src/main/resources/ehcache.xml
index 5bc9770..ece1845 100644
--- a/server/src/main/resources/ehcache.xml
+++ b/server/src/main/resources/ehcache.xml
@@ -1,23 +1,23 @@
-<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://ehcache.org/ehcache.xsd">
-<cache name="SuccessQueryCache" 
-    eternal="false" 
-    overflowToDisk="false" 
-    timeToIdleSeconds="86400"
-    memoryStoreEvictionPolicy="LRU"
-    maxBytesLocalHeap="500M"
-/>
-<cache name="ExceptionQueryCache" 
-    eternal="false" 
-    overflowToDisk="false" 
-    timeToIdleSeconds="86400"
-    memoryStoreEvictionPolicy="LRU"
-    maxBytesLocalHeap="100M"
-/>
-<cache name="UserCache" 
-    eternal="false" 
-    overflowToDisk="false" 
-    timeToLiveSeconds="10800"
-    memoryStoreEvictionPolicy="LRU"
-    maxBytesLocalHeap="100M"
-/>
+<ehcache maxBytesLocalHeap="2048M">
+    <cache name="StorageCache"
+           eternal="false"
+           overflowToDisk="false"
+           timeToIdleSeconds="86400"
+           memoryStoreEvictionPolicy="LRU"
+           maxBytesLocalHeap="500M"
+            />
+    <cache name="ExceptionQueryCache"
+           eternal="false"
+           overflowToDisk="false"
+           timeToIdleSeconds="86400"
+           memoryStoreEvictionPolicy="LRU"
+           maxBytesLocalHeap="100M"
+            />
+    <cache name="UserCache"
+           eternal="false"
+           overflowToDisk="false"
+           timeToLiveSeconds="10800"
+           memoryStoreEvictionPolicy="LRU"
+           maxBytesLocalHeap="100M"
+            />
 </ehcache>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index ef834c0..788ce19 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.rest.service;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.cache.RemoteCacheUpdater;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeDescManager;
@@ -74,21 +75,26 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
         context.setContextPath("/");
         server.setHandler(context);
 
+        final CacheService serviceA = new CacheService() {
+            @Override
+            public KylinConfig getConfig() {
+                return configA;
+            }
+        };
+        final CacheService serviceB = new CacheService() {
+            @Override
+            public KylinConfig getConfig() {
+                return configB;
+            }
+        };
+
+        serviceA.initCacheUpdater(new RemoteCacheUpdater());
+        serviceB.initCacheUpdater(new RemoteCacheUpdater());
+
         context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
             @Override
             public void handle(String type, String name, String event) {
-                final CacheService serviceA = new CacheService() {
-                    @Override
-                    public KylinConfig getConfig() {
-                        return configA;
-                    }
-                };
-                final CacheService serviceB = new CacheService() {
-                    @Override
-                    public KylinConfig getConfig() {
-                        return configB;
-                    }
-                };
+
                 Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type);
                 Broadcaster.EVENT wipeEvent = Broadcaster.EVENT.getEvent(event);
                 final String log = "wipe cache type: " + wipeType + " event:" + wipeEvent + " name:" + name;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
index 5fc757a..0d720ab 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
@@ -40,7 +40,7 @@ import com.google.common.base.Preconditions;
  * @author xjiang
  */
 public class StorageEngineFactory {
-    private static boolean allowStorageLayerCache = false;
+    private static boolean allowStorageLayerCache = true;
 
     public static IStorageEngine getStorageEngine(IRealization realization) {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
index 8e1b50a..dbd719d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
@@ -4,8 +4,10 @@ import net.sf.ehcache.Cache;
 import net.sf.ehcache.CacheManager;
 import net.sf.ehcache.config.CacheConfiguration;
 import net.sf.ehcache.config.Configuration;
+import net.sf.ehcache.config.MemoryUnit;
 import net.sf.ehcache.config.PersistenceConfiguration;
 import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+
 import org.apache.kylin.metadata.realization.StreamSQLDigest;
 import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
 import org.apache.kylin.storage.ICachableStorageEngine;
@@ -18,39 +20,63 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractCacheFledgedStorageEngine implements IStorageEngine, TeeTupleItrListener {
     private static final Logger logger = LoggerFactory.getLogger(AbstractCacheFledgedStorageEngine.class);
-    protected static CacheManager cacheManager;
+    private static final String storageCacheTemplate = "StorageCache";
 
-    static {
-        Configuration conf = new Configuration();
-        conf.setMaxBytesLocalHeap("1024M");
-        cacheManager = CacheManager.create(conf);
-    }
+    protected static CacheManager CACHE_MANAGER;
 
-    protected final ICachableStorageEngine underlyingStorage;
-    protected StreamSQLDigest streamSQLDigest;
     protected boolean queryCacheExists;
+    protected ICachableStorageEngine underlyingStorage;
+    protected StreamSQLDigest streamSQLDigest;
 
     public AbstractCacheFledgedStorageEngine(ICachableStorageEngine underlyingStorage) {
         this.underlyingStorage = underlyingStorage;
-        this.queryCacheExists = false;
         this.makeCacheIfNecessary(underlyingStorage.getStorageUUID());
     }
 
-    private void makeCacheIfNecessary(String realizationUUID) {
-        if (cacheManager.getCache(realizationUUID) == null) {
-            logger.info("Cache for {} initting...", realizationUUID);
+    public static void setCacheManager(CacheManager cacheManager) {
+        CACHE_MANAGER = cacheManager;
+    }
+
+    private static void initCacheManger() {
+        Configuration conf = new Configuration();
+        conf.setMaxBytesLocalHeap("128M");
+        CACHE_MANAGER = CacheManager.create(conf);
+
+        //a fake template for test cases
+        Cache storageCache = new Cache(new CacheConfiguration(storageCacheTemplate, 0).//
+                memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
+                eternal(false).//
+                timeToIdleSeconds(86400).//
+                diskExpiryThreadIntervalSeconds(0).//
+                maxBytesLocalHeap(10, MemoryUnit.MEGABYTES).//
+                persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
+
+        CACHE_MANAGER.addCache(storageCache);
+    }
+
+    private void makeCacheIfNecessary(String storageUUID) {
+        if (CACHE_MANAGER == null) {
+            logger.warn("CACHE_MANAGER is not provided");
+            initCacheManger();
+        }
+
+        if (CACHE_MANAGER.getCache(storageUUID) == null) {
+            logger.info("Cache for {} initting...", storageUUID);
+
             // TODO: L4J [2015-04-20 10:44:03,817][WARN][net.sf.ehcache.pool.sizeof.ObjectGraphWalker] - The configured limit of 1,000 object references was reached while attempting to calculate the size of the object graph. Severe performance degradation could occur if the sizing operation continues. This can be avoided by setting the CacheManger or Cache <sizeOfPolicy> elements maxDepthExceededBehavior to "abort" or adding stop points with @IgnoreSizeOf annotations. If performance degradation is NOT an issue at the configured limit, raise the limit value using the CacheManager or Cache <sizeOfPolicy
             //Create a Cache specifying its configuration.
-            Cache storageCache = new Cache(new CacheConfiguration(realizationUUID, 0).//
-                    memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
-                    eternal(false).//
-                    timeToIdleSeconds(86400).//
-                    diskExpiryThreadIntervalSeconds(0).//
-                    //maxBytesLocalHeap(256, MemoryUnit.MEGABYTES).//already defined at manager scope
-                    persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
+            CacheConfiguration templateConf = CACHE_MANAGER.getCache(storageCacheTemplate).getCacheConfiguration();
+            PersistenceConfiguration pconf = templateConf.getPersistenceConfiguration();
+            logger.info("PersistenceConfiguration strategy: " + pconf.getStrategy());
+
+            Cache storageCache = new Cache(new CacheConfiguration(storageUUID, (int) templateConf.getMaxEntriesLocalHeap()).//
+                    memoryStoreEvictionPolicy(templateConf.getMemoryStoreEvictionPolicy()).//
+                    eternal(templateConf.isEternal()).//
+                    timeToIdleSeconds(templateConf.getTimeToIdleSeconds()).//
+                    maxBytesLocalHeap(templateConf.getMaxBytesLocalHeap(), MemoryUnit.BYTES).persistence(pconf));
             //TODO: deal with failed queries, and only cache too long query
 
-            cacheManager.addCache(storageCache);
+            CACHE_MANAGER.addCache(storageCache);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
index f62551b..d54c5e5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
@@ -36,7 +36,7 @@ public class CacheFledgedDynamicStorageEngine extends AbstractCacheFledgedStorag
         this.partitionColRef = partitionColRef;
 
         Preconditions.checkArgument(this.partitionColRef != null, "For dynamic columns like " + //
-                this.underlyingStorage.getStorageUUID()+ ", partition column must be provided");
+                this.underlyingStorage.getStorageUUID() + ", partition column must be provided");
     }
 
     @Override
@@ -47,7 +47,7 @@ public class CacheFledgedDynamicStorageEngine extends AbstractCacheFledgedStorag
 
         streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
         StreamSQLResult cachedResult = null;
-        Cache cache = cacheManager.getCache(this.underlyingStorage.getStorageUUID());
+        Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
         Element element = cache.get(streamSQLDigest);
         if (element != null) {
             this.queryCacheExists = true;
@@ -117,8 +117,9 @@ public class CacheFledgedDynamicStorageEngine extends AbstractCacheFledgedStorag
         }
     }
 
-   @Override
-    public void notify(List<ITuple> duplicated) {
+    @Override
+    public void notify(List<ITuple> duplicated,long createTime) {
+
         Range<Long> cacheExclude = this.underlyingStorage.getVolatilePeriod();
         if (cacheExclude != null) {
             List<Range<Long>> cachablePeriods = RangeUtil.remove(ts, cacheExclude);
@@ -133,7 +134,7 @@ public class CacheFledgedDynamicStorageEngine extends AbstractCacheFledgedStorag
         }
 
         StreamSQLResult newCacheEntry = new StreamSQLResult(duplicated, ts, partitionColRef);
-        cacheManager.getCache(this.underlyingStorage.getStorageUUID()).put(new Element(streamSQLDigest, newCacheEntry));
+        CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID()).put(new Element(streamSQLDigest, newCacheEntry));
         logger.info("cache after the query: " + newCacheEntry);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
index 72372c6..7a2b483 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
@@ -20,7 +20,7 @@ import java.util.List;
 /**
  * Created by Hongbin Ma(Binmahone) on 5/11/15.
  */
-public class CacheFledgedStaticStorageEngine extends AbstractCacheFledgedStorageEngine  {
+public class CacheFledgedStaticStorageEngine extends AbstractCacheFledgedStorageEngine {
     private static final Logger logger = LoggerFactory.getLogger(CacheFledgedStaticStorageEngine.class);
 
     public CacheFledgedStaticStorageEngine(ICachableStorageEngine underlyingStorage) {
@@ -32,7 +32,7 @@ public class CacheFledgedStaticStorageEngine extends AbstractCacheFledgedStorage
 
         streamSQLDigest = new StreamSQLDigest(sqlDigest, null);
         StreamSQLResult cachedResult = null;
-        Cache cache = cacheManager.getCache(this.underlyingStorage.getStorageUUID());
+        Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
         Element element = cache.get(streamSQLDigest);
         if (element != null) {
             this.queryCacheExists = true;
@@ -64,9 +64,26 @@ public class CacheFledgedStaticStorageEngine extends AbstractCacheFledgedStorage
     }
 
     @Override
-    public void notify(List<ITuple> duplicated) {
-        StreamSQLResult newCacheEntry = new StreamSQLResult(duplicated, Ranges.<Long> all(), null);
-        cacheManager.getCache(this.underlyingStorage.getStorageUUID()).put(new Element(streamSQLDigest, newCacheEntry));
-        logger.info("cache after the query: " + newCacheEntry);
+    public void notify(List<ITuple> duplicated, long createTime) {
+        boolean cacheIt = true;
+        //        long storageQueryTime = System.currentTimeMillis() - createTime;
+        //        long durationThreshold = KylinConfig.getInstanceFromEnv().getQueryDurationCacheThreshold();
+        //        long scancountThreshold = KylinConfig.getInstanceFromEnv().getQueryScanCountCacheThreshold();
+        //
+        //        if (storageQueryTime < durationThreshold) {
+        //            logger.info("Skip storage caching for storage cache because storage query time {} less than {}", storageQueryTime, durationThreshold);
+        //            cacheIt = false;
+        //        }
+        //
+        //        if (duplicated.size() < scancountThreshold) {
+        //            logger.info("Skip storage caching for storage cache because scan count {} less than {}", duplicated.size(), scancountThreshold);
+        //            cacheIt = false;
+        //        }
+
+        if (cacheIt) {
+            StreamSQLResult newCacheEntry = new StreamSQLResult(duplicated, Ranges.<Long> all(), null);
+            CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID()).put(new Element(streamSQLDigest, newCacheEntry));
+            logger.info("cache after the query: " + newCacheEntry);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c7495de9/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java b/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
index 0133214..11fb70b 100644
--- a/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
@@ -26,7 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  * Created by Hongbin Ma(Binmahone) on 5/14/15.
  */
-public class StaticCacheTest {
+public class StaticCacheTest  {
+
 
     @Test
     public void basicTest() {
@@ -76,18 +77,16 @@ public class StaticCacheTest {
         firstIterator.close();
 
         ITupleIterator secondIterator = cacheFledgedStaticStorageEngine.search(context, sqlDigest, tupleInfo);
-        IdentityHashMap<ITuple,Void> secondResults = new IdentityHashMap<>();
-        while(secondIterator.hasNext())
-        {
-            secondResults.put(secondIterator.next(),null);
+        IdentityHashMap<ITuple, Void> secondResults = new IdentityHashMap<>();
+        while (secondIterator.hasNext()) {
+            secondResults.put(secondIterator.next(), null);
         }
         secondIterator.close();
 
         ITupleIterator thirdIterator = cacheFledgedStaticStorageEngine.search(context, sqlDigest, tupleInfo);
-        IdentityHashMap<ITuple,Void> thirdResults = new IdentityHashMap<>();
-        while(thirdIterator.hasNext())
-        {
-            thirdResults.put(thirdIterator.next(),null);
+        IdentityHashMap<ITuple, Void> thirdResults = new IdentityHashMap<>();
+        while (thirdIterator.hasNext()) {
+            thirdResults.put(thirdIterator.next(), null);
         }
         thirdIterator.close();