You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/06/06 17:12:59 UTC

incubator-kylin git commit: KYLIN-807 Add lock to avoid merge conflict; And drop the statistics file from resource store when drop a segment.

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 cdba06c45 -> ee53c1643


KYLIN-807 Add lock to avoid merge conflict; And drop the statistics file from resource store when drop a segment.

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ee53c164
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ee53c164
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ee53c164

Branch: refs/heads/0.8.0
Commit: ee53c1643d4ff119e079a2953cee25c69697ddae
Parents: cdba06c
Author: shaofengshi <sh...@apache.org>
Authored: Sat Jun 6 23:12:49 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Jun 6 23:12:49 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 18 ++++++++++-
 .../apache/kylin/rest/service/CacheService.java | 32 +++++++++++---------
 2 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ee53c164/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 839cee7..5fc5584 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -249,9 +249,15 @@ public class CubeManager implements IRealizationProvider {
         if (cubeBuilder.getToAddSegs() != null)
             cube.getSegments().addAll(Arrays.asList(cubeBuilder.getToAddSegs()));
 
-        if (cubeBuilder.getToRemoveSegs() != null)
+        List<String> toRemoveResources = Lists.newArrayList();
+        if (cubeBuilder.getToRemoveSegs() != null) {
             cube.getSegments().removeAll(Arrays.asList(cubeBuilder.getToRemoveSegs()));
 
+            for (CubeSegment toRemoveSeg : cubeBuilder.getToRemoveSegs()) {
+                toRemoveResources.add(toRemoveSeg.getStatisticsResourcePath());
+            }
+        }
+
         if (cubeBuilder.getToUpdateSegs() != null) {
             for (CubeSegment segment : cubeBuilder.getToUpdateSegs()) {
                 for (int i = 0; i < cube.getSegments().size(); i++) {
@@ -293,6 +299,16 @@ public class CubeManager implements IRealizationProvider {
             cube = updateCube(cubeBuilder, retry);
         }
 
+        if (toRemoveResources.size() > 0) {
+            for (String resource : toRemoveResources) {
+                try {
+                    getStore().deleteResource(resource);
+                } catch (IOException ioe) {
+                    logger.error("Failed to delete resource " + toRemoveResources.toString());
+                }
+            }
+        }
+
         cubeMap.put(cube.getName(), cube);
 
         //this is a duplicate call to take care of scenarios where REST cache service unavailable

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ee53c164/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 56f9d4b..cc56254 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -159,23 +159,25 @@ public class CacheService extends BasicService {
         logger.debug("server mode: " + serverMode);
         if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) {
             logger.debug("This is the job engine node, will check whether auto merge is needed on cube " + cubeName);
-            CubeInstance cube = getCubeManager().getCube(cubeName);
             CubeSegment newSeg = null;
-            try {
-                newSeg = getCubeManager().autoMergeCubeSegments(cube);
-                if (newSeg != null) {
-                    logger.debug("Will submit merge job on " + newSeg);
-                    CubingJobBuilder builder = new CubingJobBuilder(new JobEngineConfig(getConfig()));
-                    builder.setSubmitter("SYSTEM");
-                    newSeg = getCubeManager().mergeSegments(cube, newSeg.getDateRangeStart(), newSeg.getDateRangeEnd());
-                    CubingJob job = builder.mergeJob(newSeg);
-                    getExecutableManager().addJob(job);
-                } else {
-                    logger.debug("Not ready for merge on cube " + cubeName);
-                }
+            synchronized (getCubeManager().getCube(cubeName)) {
+                CubeInstance cube = getCubeManager().getCube(cubeName);
+                try {
+                    newSeg = getCubeManager().autoMergeCubeSegments(cube);
+                    if (newSeg != null) {
+                        newSeg = getCubeManager().mergeSegments(cube, newSeg.getDateRangeStart(), newSeg.getDateRangeEnd());
+                        logger.debug("Will submit merge job on " + newSeg);
+                        CubingJobBuilder builder = new CubingJobBuilder(new JobEngineConfig(getConfig()));
+                        builder.setSubmitter("SYSTEM");
+                        CubingJob job = builder.mergeJob(newSeg);
+                        getExecutableManager().addJob(job);
+                    } else {
+                        logger.debug("Not ready for merge on cube " + cubeName);
+                    }
 
-            } catch (IOException e) {
-                logger.error("Failed to auto merge cube " + cubeName, e);
+                } catch (IOException e) {
+                    logger.error("Failed to auto merge cube " + cubeName, e);
+                }
             }
         }
     }