You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/09 04:54:43 UTC

[3/3] incubator-kylin git commit: KYLIN-1127 Adopt listener pattern to wipe query cache on cube update

KYLIN-1127 Adopt listener pattern to wipe query cache on cube update


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a3397d04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a3397d04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a3397d04

Branch: refs/heads/2.x-staging
Commit: a3397d044d6cd333d8f9d94bfd75aa603180d67e
Parents: 2f4595a
Author: Yang Li <li...@apache.org>
Authored: Mon Nov 9 11:53:43 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Nov 9 11:53:43 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 34 ++++++++--
 .../kylin/storage/hybrid/HybridManager.java     |  4 +-
 .../apache/kylin/rest/service/CacheService.java | 68 +++++++++++++++-----
 .../kylin/rest/service/CacheServiceTest.java    |  2 +
 4 files changed, 85 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index a249cd7..89873d2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -236,6 +236,9 @@ public class CubeManager implements IRealizationProvider {
 
         // delete cube from project
         ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName);
+        
+        if (listener != null)
+            listener.afterCubeDelete(cube);
 
         return cube;
     }
@@ -248,14 +251,22 @@ public class CubeManager implements IRealizationProvider {
         CubeInstance cube = CubeInstance.create(cubeName, projectName, desc);
         cube.setOwner(owner);
 
-        updateCube(new CubeUpdate(cube));
+        updateCubeWithRetry(new CubeUpdate(cube), 0);
         ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner);
 
+        if (listener != null)
+            listener.afterCubeCreate(cube);
+        
         return cube;
     }
 
     public CubeInstance updateCube(CubeUpdate update) throws IOException {
-        return updateCube(update, 0);
+        CubeInstance cube = updateCubeWithRetry(update, 0);
+
+        if (listener != null)
+            listener.afterCubeUpdate(cube);
+        
+        return cube;
     }
 
     private boolean validateReadySegments(CubeInstance cube) {
@@ -283,7 +294,7 @@ public class CubeManager implements IRealizationProvider {
         return true;
     }
 
-    private CubeInstance updateCube(CubeUpdate update, int retry) throws IOException {
+    private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
         if (update == null || update.getCubeInstance() == null)
             throw new IllegalStateException();
 
@@ -348,7 +359,7 @@ public class CubeManager implements IRealizationProvider {
             cube = reloadCubeLocal(cube.getName());
             update.setCubeInstance(cube);
             retry++;
-            cube = updateCube(update, retry);
+            cube = updateCubeWithRetry(update, retry);
         }
 
         if (toRemoveResources.size() > 0) {
@@ -850,4 +861,19 @@ public class CubeManager implements IRealizationProvider {
         return getCube(name);
     }
 
+    // ============================================================================
+    
+    public interface CubeChangeListener {
+        void afterCubeCreate(CubeInstance cube);
+
+        void afterCubeUpdate(CubeInstance cube);
+
+        void afterCubeDelete(CubeInstance cube);
+    }
+    
+    private CubeChangeListener listener;
+    
+    public void setCubeChangeListener(CubeChangeListener listener) {
+        this.listener = listener;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 9392ef5..5f16b6b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -98,7 +98,7 @@ public class HybridManager implements IRealizationProvider {
         logger.debug("Loaded " + paths.size() + " Hybrid(s)");
     }
 
-    public void reloadHybridInstanceByChild(RealizationType type, String realizationName) throws IOException {
+    public void reloadHybridInstanceByChild(RealizationType type, String realizationName) {
         for (HybridInstance hybridInstance : hybridMap.values()) {
             boolean includes = false;
             for (IRealization realization : hybridInstance.getRealizations()) {
@@ -113,7 +113,7 @@ public class HybridManager implements IRealizationProvider {
         }
     }
 
-    private synchronized HybridInstance loadHybridInstance(String path) throws IOException {
+    private synchronized HybridInstance loadHybridInstance(String path) {
         ResourceStore store = getStore();
 
         HybridInstance hybridInstance = null;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index f9c3ec1..8371907 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -26,6 +26,7 @@ import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import javax.annotation.PostConstruct;
 import javax.sql.DataSource;
 
 import net.sf.ehcache.CacheManager;
@@ -72,6 +73,28 @@ public class CacheService extends BasicService {
     @Autowired
     private CacheManager cacheManager;
 
+    @PostConstruct
+    public void initCubeChangeListener() throws IOException {
+        CubeManager cubeMgr = CubeManager.getInstance(getConfig());
+        cubeMgr.setCubeChangeListener(new CubeManager.CubeChangeListener() {
+
+            @Override
+            public void afterCubeCreate(CubeInstance cube) {
+                // no cache need change
+            }
+
+            @Override
+            public void afterCubeUpdate(CubeInstance cube) {
+                rebuildCubeCache(cube.getName());
+            }
+
+            @Override
+            public void afterCubeDelete(CubeInstance cube) {
+                removeCubeCache(cube.getName(), cube);
+            }
+        });
+    }
+
     // for test
     public void setCubeService(CubeService cubeService) {
         this.cubeService = cubeService;
@@ -151,14 +174,7 @@ public class CacheService extends BasicService {
         try {
             switch (cacheType) {
             case CUBE:
-                CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
-                getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
-                getProjectManager().clearL2Cache();
-                //clean query related cache first
-                if (newCube != null) {
-                    cleanDataCache(newCube.getUuid());
-                }
-                cubeService.updateOnNewSegmentReady(cacheKey);
+                rebuildCubeCache(cacheKey);
                 break;
             case STREAMING:
                 getStreamingManager().reloadStreamingConfigLocal(cacheKey);
@@ -170,10 +186,8 @@ public class CacheService extends BasicService {
                 getCubeDescManager().reloadCubeDescLocal(cacheKey);
                 break;
             case PROJECT:
-                ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
-                if (projectInstance != null) {
-                    removeOLAPDataSource(projectInstance.getName());
-                }
+                getProjectManager().reloadProjectLocal(cacheKey);
+                removeOLAPDataSource(cacheKey);
                 break;
             case INVERTED_INDEX:
                 //II update does not need to update storage cache because it is dynamic already
@@ -217,16 +231,23 @@ public class CacheService extends BasicService {
         }
     }
 
+    private void rebuildCubeCache(String cubeName) {
+        CubeInstance cube = getCubeManager().reloadCubeLocal(cubeName);
+        getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cubeName);
+        getProjectManager().clearL2Cache();
+        //clean query related cache first
+        if (cube != null) {
+            cleanDataCache(cube.getUuid());
+        }
+        cubeService.updateOnNewSegmentReady(cubeName);
+    }
+
     public void removeCache(Broadcaster.TYPE cacheType, String cacheKey) {
         final String log = "remove cache type: " + cacheType + " name:" + cacheKey;
         try {
             switch (cacheType) {
             case CUBE:
-                CubeInstance cube = getCubeManager().getCube(cacheKey);
-                getCubeManager().removeCubeLocal(cacheKey);
-                if (cube != null) {
-                    cleanDataCache(cube.getUuid());
-                }
+                removeCubeCache(cacheKey, null);
                 break;
             case CUBE_DESC:
                 getCubeDescManager().removeLocalCubeDesc(cacheKey);
@@ -251,4 +272,17 @@ public class CacheService extends BasicService {
             throw new RuntimeException("error " + log, e);
         }
     }
+
+    private void removeCubeCache(String cubeName, CubeInstance cube) {
+        // you may not get the cube instance if it's already removed from metadata
+        if (cube == null) {
+            cube = getCubeManager().getCube(cubeName);
+        }
+        
+        getCubeManager().removeCubeLocal(cubeName);
+        
+        if (cube != null) {
+            cleanDataCache(cube.getUuid());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index b90c10a..25b131a 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -108,7 +108,9 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
         };
 
         serviceA.setCubeService(cubeServiceA);
+        serviceA.initCubeChangeListener();
         serviceB.setCubeService(cubeServiceB);
+        serviceB.initCubeChangeListener();
 
         context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
             @Override