You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/09 04:54:43 UTC
[3/3] incubator-kylin git commit: KYLIN-1127 Adopt listener pattern
to wipe query cache on cube update
KYLIN-1127 Adopt listener pattern to wipe query cache on cube update
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a3397d04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a3397d04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a3397d04
Branch: refs/heads/2.x-staging
Commit: a3397d044d6cd333d8f9d94bfd75aa603180d67e
Parents: 2f4595a
Author: Yang Li <li...@apache.org>
Authored: Mon Nov 9 11:53:43 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Nov 9 11:53:43 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 34 ++++++++--
.../kylin/storage/hybrid/HybridManager.java | 4 +-
.../apache/kylin/rest/service/CacheService.java | 68 +++++++++++++++-----
.../kylin/rest/service/CacheServiceTest.java | 2 +
4 files changed, 85 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index a249cd7..89873d2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -236,6 +236,9 @@ public class CubeManager implements IRealizationProvider {
// delete cube from project
ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName);
+
+ if (listener != null)
+ listener.afterCubeDelete(cube);
return cube;
}
@@ -248,14 +251,22 @@ public class CubeManager implements IRealizationProvider {
CubeInstance cube = CubeInstance.create(cubeName, projectName, desc);
cube.setOwner(owner);
- updateCube(new CubeUpdate(cube));
+ updateCubeWithRetry(new CubeUpdate(cube), 0);
ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner);
+ if (listener != null)
+ listener.afterCubeCreate(cube);
+
return cube;
}
public CubeInstance updateCube(CubeUpdate update) throws IOException {
- return updateCube(update, 0);
+ CubeInstance cube = updateCubeWithRetry(update, 0);
+
+ if (listener != null)
+ listener.afterCubeUpdate(cube);
+
+ return cube;
}
private boolean validateReadySegments(CubeInstance cube) {
@@ -283,7 +294,7 @@ public class CubeManager implements IRealizationProvider {
return true;
}
- private CubeInstance updateCube(CubeUpdate update, int retry) throws IOException {
+ private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
if (update == null || update.getCubeInstance() == null)
throw new IllegalStateException();
@@ -348,7 +359,7 @@ public class CubeManager implements IRealizationProvider {
cube = reloadCubeLocal(cube.getName());
update.setCubeInstance(cube);
retry++;
- cube = updateCube(update, retry);
+ cube = updateCubeWithRetry(update, retry);
}
if (toRemoveResources.size() > 0) {
@@ -850,4 +861,19 @@ public class CubeManager implements IRealizationProvider {
return getCube(name);
}
+ // ============================================================================
+
+ public interface CubeChangeListener {
+ void afterCubeCreate(CubeInstance cube);
+
+ void afterCubeUpdate(CubeInstance cube);
+
+ void afterCubeDelete(CubeInstance cube);
+ }
+
+ private CubeChangeListener listener;
+
+ public void setCubeChangeListener(CubeChangeListener listener) {
+ this.listener = listener;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 9392ef5..5f16b6b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -98,7 +98,7 @@ public class HybridManager implements IRealizationProvider {
logger.debug("Loaded " + paths.size() + " Hybrid(s)");
}
- public void reloadHybridInstanceByChild(RealizationType type, String realizationName) throws IOException {
+ public void reloadHybridInstanceByChild(RealizationType type, String realizationName) {
for (HybridInstance hybridInstance : hybridMap.values()) {
boolean includes = false;
for (IRealization realization : hybridInstance.getRealizations()) {
@@ -113,7 +113,7 @@ public class HybridManager implements IRealizationProvider {
}
}
- private synchronized HybridInstance loadHybridInstance(String path) throws IOException {
+ private synchronized HybridInstance loadHybridInstance(String path) {
ResourceStore store = getStore();
HybridInstance hybridInstance = null;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index f9c3ec1..8371907 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -26,6 +26,7 @@ import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import net.sf.ehcache.CacheManager;
@@ -72,6 +73,28 @@ public class CacheService extends BasicService {
@Autowired
private CacheManager cacheManager;
+ @PostConstruct
+ public void initCubeChangeListener() throws IOException {
+ CubeManager cubeMgr = CubeManager.getInstance(getConfig());
+ cubeMgr.setCubeChangeListener(new CubeManager.CubeChangeListener() {
+
+ @Override
+ public void afterCubeCreate(CubeInstance cube) {
+ // no cache need change
+ }
+
+ @Override
+ public void afterCubeUpdate(CubeInstance cube) {
+ rebuildCubeCache(cube.getName());
+ }
+
+ @Override
+ public void afterCubeDelete(CubeInstance cube) {
+ removeCubeCache(cube.getName(), cube);
+ }
+ });
+ }
+
// for test
public void setCubeService(CubeService cubeService) {
this.cubeService = cubeService;
@@ -151,14 +174,7 @@ public class CacheService extends BasicService {
try {
switch (cacheType) {
case CUBE:
- CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
- getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
- getProjectManager().clearL2Cache();
- //clean query related cache first
- if (newCube != null) {
- cleanDataCache(newCube.getUuid());
- }
- cubeService.updateOnNewSegmentReady(cacheKey);
+ rebuildCubeCache(cacheKey);
break;
case STREAMING:
getStreamingManager().reloadStreamingConfigLocal(cacheKey);
@@ -170,10 +186,8 @@ public class CacheService extends BasicService {
getCubeDescManager().reloadCubeDescLocal(cacheKey);
break;
case PROJECT:
- ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
- if (projectInstance != null) {
- removeOLAPDataSource(projectInstance.getName());
- }
+ getProjectManager().reloadProjectLocal(cacheKey);
+ removeOLAPDataSource(cacheKey);
break;
case INVERTED_INDEX:
//II update does not need to update storage cache because it is dynamic already
@@ -217,16 +231,23 @@ public class CacheService extends BasicService {
}
}
+ private void rebuildCubeCache(String cubeName) {
+ CubeInstance cube = getCubeManager().reloadCubeLocal(cubeName);
+ getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cubeName);
+ getProjectManager().clearL2Cache();
+ //clean query related cache first
+ if (cube != null) {
+ cleanDataCache(cube.getUuid());
+ }
+ cubeService.updateOnNewSegmentReady(cubeName);
+ }
+
public void removeCache(Broadcaster.TYPE cacheType, String cacheKey) {
final String log = "remove cache type: " + cacheType + " name:" + cacheKey;
try {
switch (cacheType) {
case CUBE:
- CubeInstance cube = getCubeManager().getCube(cacheKey);
- getCubeManager().removeCubeLocal(cacheKey);
- if (cube != null) {
- cleanDataCache(cube.getUuid());
- }
+ removeCubeCache(cacheKey, null);
break;
case CUBE_DESC:
getCubeDescManager().removeLocalCubeDesc(cacheKey);
@@ -251,4 +272,17 @@ public class CacheService extends BasicService {
throw new RuntimeException("error " + log, e);
}
}
+
+ private void removeCubeCache(String cubeName, CubeInstance cube) {
+ // you may not get the cube instance if it's already removed from metadata
+ if (cube == null) {
+ cube = getCubeManager().getCube(cubeName);
+ }
+
+ getCubeManager().removeCubeLocal(cubeName);
+
+ if (cube != null) {
+ cleanDataCache(cube.getUuid());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index b90c10a..25b131a 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -108,7 +108,9 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
};
serviceA.setCubeService(cubeServiceA);
+ serviceA.initCubeChangeListener();
serviceB.setCubeService(cubeServiceB);
+ serviceB.initCubeChangeListener();
context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
@Override