You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/06/04 02:06:56 UTC

[kylin] 01/02: KYLIN-3983 Add extra metadata for measure. Fix the dead lock cause by lock order. No need to use different lock in CubeManager and MeasureManager.

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch dynamic-measure
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit fbc5c0d04aa5341e81a02802c652decd71800e36
Author: yuzhang <sh...@163.com>
AuthorDate: Sun May 26 23:48:15 2019 +0800

    KYLIN-3983 Add extra metadata for measure. Fix the dead lock cause by lock order. No need to use different lock in CubeManager and MeasureManager.
---
 .../java/org/apache/kylin/cube/CubeManager.java    |  5 ++
 .../org/apache/kylin/measure/MeasureManager.java   | 78 +++++++++-------------
 .../apache/kylin/measure/MeasureManagerTest.java   |  3 +-
 3 files changed, 39 insertions(+), 47 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 6a6f2b4..f83e8db 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -1248,4 +1248,9 @@ public class CubeManager implements IRealizationProvider {
     public MeasureManager getMeasureManager() {
         return MeasureManager.getInstance(this.config);
     }
+
+    public AutoReadWriteLock getLock() {
+        return cubeMapLock;
+    }
+
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/measure/MeasureManager.java b/core-cube/src/main/java/org/apache/kylin/measure/MeasureManager.java
index 5a0fed1..29f33f7 100644
--- a/core-cube/src/main/java/org/apache/kylin/measure/MeasureManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/measure/MeasureManager.java
@@ -85,14 +85,14 @@ public class MeasureManager {
 
     private CachedCrudAssist<MeasureInstance> crud;
 
-    private AutoReadWriteLock measureMapLock = new AutoReadWriteLock();
+    private AutoReadWriteLock cubeMapLock;
 
     private MeasureManager(KylinConfig config) throws IOException {
         LOG.info("Initializing MeasureManager with config " + config);
         this.config = config;
         this.measureMap = new CaseInsensitiveStringCache<>(config, "measure");
         this.cubeL2Cache = new CubeL2Cache(getCubeManager());
-
+        this.cubeMapLock = getCubeManager().getLock();
         this.crud = new CachedCrudAssist<MeasureInstance>(getStore(), ResourceStore.MEASURE_RESOURCE_ROOT, MeasureInstance.class, measureMap) {
             @Override
             protected MeasureInstance initEntityAfterReload(MeasureInstance entity, String resourceName) {
@@ -112,18 +112,10 @@ public class MeasureManager {
 
         @Override
         public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey) throws IOException {
-
-            switch (entity) {
-//                case MEASURE:
-//                    triggerByMeasure(broadcaster, entity, event, cacheKey);
-//                    break;
-//                case CUBE:
-//                case CUBE_DESC:
-                case CUBE_MEASURE:
-                    triggerByCube(broadcaster, entity, event, cacheKey);
-                    break;
-                default :
-                    break;
+            if (entity.equals(CUBE_MEASURE)) {
+                triggerByCube(broadcaster, entity, event, cacheKey);
+            } else {
+                throw new IllegalArgumentException("Only listen on " + CUBE_MEASURE + ", can't process this entity: " + entity);
             }
         }
 
@@ -146,7 +138,7 @@ public class MeasureManager {
     }
 
     private MeasureInstance reloadQuietly(String cacheKey) {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             MeasureInstance ret = crud.reloadQuietly(cacheKey);
             cubeL2Cache.reloadCache(ret.getCubeName());
             return ret;
@@ -154,7 +146,7 @@ public class MeasureManager {
     }
 
     private List<MeasureInstance> reloadByCubeQuietly(String cubeName) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             CubeInstance cube = getCubeManager().getCube(cubeName);
             if (cube == null) {
                 LOG.warn("Can't find cube {} in cache, it's missing or unload yet, so kylin can't reload cache of measures on cube. you can trigger this reload process again by update this cube!");
@@ -178,25 +170,25 @@ public class MeasureManager {
     }
 
     public MeasureInstance getMeasure(String cubeName, String measureName) {
-        try (AutoLock lock = measureMapLock.lockForRead()) {
+        try (AutoLock lock = cubeMapLock.lockForRead()) {
             return getMeasureByKey(MeasureInstance.getResourceName(cubeName, measureName));
         }
     }
 
     public MeasureInstance getMeasureByKey(String key) {
-        try (AutoLock lock = measureMapLock.lockForRead()) {
+        try (AutoLock lock = cubeMapLock.lockForRead()) {
             return measureMap.get(key);
         }
     }
 
     public List<MeasureInstance> getMeasuresOnSegment(String projectName, String cubeName, String segmentName) {
-        try (AutoLock lock = measureMapLock.lockForRead()) {
+        try (AutoLock lock = cubeMapLock.lockForRead()) {
             return cubeL2Cache.getMeasuresOf(cubeName, segmentName);
         }
     }
 
     public List<MeasureInstance> getMeasuresInCube(String projectName, String cubeName) {
-        try (AutoLock lock = measureMapLock.lockForRead()) {
+        try (AutoLock lock = cubeMapLock.lockForRead()) {
             return cubeL2Cache.getMeasuresOnCube(cubeName);
         }
     }
@@ -209,7 +201,7 @@ public class MeasureManager {
      * @return
      */
     public List<MeasureInstance> getMeasuresInCubeAllowMiss(String projectName, String cubeName) {
-        try (AutoLock lock = measureMapLock.lockForRead()) {
+        try (AutoLock lock = cubeMapLock.lockForRead()) {
             // try get don't allow miss
             return cubeL2Cache.getMeasuresOnCube(cubeName);
         } catch (CubeL2Cache.MissMeasureCacheException e) {
@@ -225,7 +217,7 @@ public class MeasureManager {
      * @throws IOException
      */
     public List<MeasureInstance> createMeasuresOnCube(CubeInstance cube) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             boolean existed = cubeL2Cache.getMeasuresOnCubeAllowMiss(cube.getName()).size() != 0;
             List<MeasureDesc> measures = cube.getMeasures();
             List<MeasureInstance> needSaveMeasures = measures.stream()
@@ -238,7 +230,7 @@ public class MeasureManager {
 
     // don't announce
     private MeasureInstance createMeasureAlone(MeasureInstance measureInstance) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             createInStore(measureInstance);
             // save in cache
             return createCache(measureInstance);
@@ -246,14 +238,14 @@ public class MeasureManager {
     }
 
     private void createInStore(MeasureInstance measureInstance) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             // save meta data
             getStore().checkAndPutResource(getResourcePath(measureInstance), measureInstance, MEASURE_SERIALIZER);
         }
     }
 
     private MeasureInstance createCache(MeasureInstance measureInstance) {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             // save in cache
             measureMap.putLocal(measureInstance.getKey(), measureInstance);
             cubeL2Cache.reloadCache(measureInstance.getCubeName());
@@ -262,7 +254,7 @@ public class MeasureManager {
     }
 
     public List<MeasureInstance> batchSaveCubeMeasure(List<MeasureInstance> measures, String cubeName, boolean existed) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             // save in store
             batchSaveCubeMeasureInStore(measures);
 
@@ -275,7 +267,7 @@ public class MeasureManager {
     }
 
     private List<MeasureInstance> batchSaveCubeMeasureInStore(List<MeasureInstance> measures) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             // save in store
             for (MeasureInstance m : measures) {
                 if (null == m.getUuid()) {
@@ -288,7 +280,7 @@ public class MeasureManager {
     }
 
     private List<MeasureInstance> batchSaveCubeMeasureInCache(List<MeasureInstance> measures, String cubeName) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             // save in cache
             for (MeasureInstance m : measures) {
                 measureMap.putLocal(m.getKey(), m);
@@ -302,7 +294,7 @@ public class MeasureManager {
     // ================delete====================
 
     private MeasureInstance deleteInStore(MeasureInstance m) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             if (m.getSegmentsName().size() > 0) {
                 throw new IllegalStateException(String.format(Locale.ROOT, "Can't delete measure %s, please delete the contained segments first. %s", m.getName(), m.getSegmentsName()));
             }
@@ -312,7 +304,7 @@ public class MeasureManager {
     }
 
     private MeasureInstance deleteCache(MeasureInstance m) {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             measureMap.remove(m.getKey());
             cubeL2Cache.reloadCache(m.getCubeName());
             return m;
@@ -320,7 +312,7 @@ public class MeasureManager {
     }
 
     public List<MeasureInstance> deleteByCube(String projectName, String cubeName) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             // delete meta data
             List<MeasureInstance> needDrop = deleteInStoreByCube(projectName, cubeName);
             if (null != needDrop && needDrop.size() > 0) {
@@ -334,7 +326,7 @@ public class MeasureManager {
     }
 
     private List<MeasureInstance> removeLocalByCube(String cubeName) {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             List<MeasureInstance> needRemove = cubeL2Cache.getMeasuresOnCube(cubeName);
             if (needRemove.size() == 0) {
                 // get from L1 cache
@@ -358,7 +350,7 @@ public class MeasureManager {
     }
 
     private MeasureInstance removeL1Cache(String key) {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             MeasureInstance ret = measureMap.get(key);
             if (null != ret) {
                 measureMap.removeLocal(key);
@@ -373,7 +365,7 @@ public class MeasureManager {
      * @return
      */
     private MeasureInstance removeLocal(String key) {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             MeasureInstance ret = removeL1Cache(key);
             if (null != ret) {
                 cubeL2Cache.reloadCache(ret.getCubeName());
@@ -390,7 +382,7 @@ public class MeasureManager {
      * @return
      */
     private List<MeasureInstance> deleteInStoreByCube(String projectName, String cubeName) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             List<MeasureInstance> needDrop = getMeasuresInCube(projectName, cubeName);
             ResourceStore store = getStore();
             for (MeasureInstance m : needDrop) {
@@ -401,7 +393,7 @@ public class MeasureManager {
     }
 
     public void deleteSegmentByName(CubeInstance cube, String segmentName) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             List<MeasureInstance> updated = deleteSegInMeasure(cube, segmentName);
             batchSaveCubeMeasure(updated, cube.getName(), true);
         }
@@ -446,7 +438,7 @@ public class MeasureManager {
      * if segsToDrop is null it's mean delete all;
      */
     public void deleteSegments(CubeInstance cube, ISegment... segsToDrop) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             List<String> needDrop = Lists.newArrayList(segsToDrop).stream().map(ISegment::getName).collect(Collectors.toList());
             deleteSegmentsByName(cube, needDrop);
         }
@@ -463,7 +455,7 @@ public class MeasureManager {
     }
 
     public void deleteAllSegments(CubeInstance cube) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             List<String> needDrop;
             LOG.info("Delete all segments in cube.", cube.getName());
             needDrop = cube.getSegments().stream().map(ISegment::getName).collect(Collectors.toList());
@@ -480,7 +472,7 @@ public class MeasureManager {
      * @return updated measures in cube
      */
     public List<MeasureInstance> updateMeasuresOnCube(CubeDesc cubeDesc) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             List<CubeInstance> cubesByDesc = getCubeManager().getCubesByDesc(cubeDesc.getName());
             List<MeasureInstance> ret = Lists.newArrayList();
             for (CubeInstance cube : cubesByDesc) {
@@ -537,7 +529,7 @@ public class MeasureManager {
     }
 
     private List<MeasureInstance> updateMeasuresOnCube(CubeInstance cube) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             List<MeasureInstance> cachedMeasureOnCube = getMeasuresInCubeAllowMiss(cube.getProject(), cube.getName());
             List<MeasureInstance> needDrop = Lists.newArrayListWithCapacity(5);
             List<MeasureInstance> needAdd = Lists.newArrayListWithCapacity(6);
@@ -579,7 +571,7 @@ public class MeasureManager {
      * @throws IOException
      */
     public List<MeasureInstance> updateMeasures(String project, String cubeName, List<MeasureInstance> updatedMeasures) throws IOException {
-        try (AutoLock lock = measureMapLock.lockForWrite()) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
             return batchSaveCubeMeasure(updatedMeasures, cubeName, true);
         }
     }
@@ -588,10 +580,6 @@ public class MeasureManager {
         return ResourceStore.MEASURE_RESOURCE_ROOT + "/" + m.resourceName() + MetadataConstants.FILE_SURFIX;
     }
 
-    public AutoReadWriteLock getMeasureMapLock() {
-        return measureMapLock;
-    }
-
     private CubeManager getCubeManager(){
         return CubeManager.getInstance(config);
     }
diff --git a/core-cube/src/test/java/org/apache/kylin/measure/MeasureManagerTest.java b/core-cube/src/test/java/org/apache/kylin/measure/MeasureManagerTest.java
index 22a10f8..908715e 100644
--- a/core-cube/src/test/java/org/apache/kylin/measure/MeasureManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/measure/MeasureManagerTest.java
@@ -32,7 +32,6 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.HBaseMappingDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.log4j.BasicConfigurator;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,7 +52,7 @@ public class MeasureManagerTest extends LocalFileMetadataTestCase {
 
     @Before
     public void setUp() throws Exception {
-        BasicConfigurator.configure();
+        // BasicConfigurator.configure();
         this.createTestMetadata();
     }