You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/02/07 08:04:31 UTC

[kylin] 04/15: Fix synchronization on boxed types or strings

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 79207d8961e868a850bb6014cba68c45b1ad7b24
Author: nichunen <ni...@apache.org>
AuthorDate: Wed Jan 15 14:51:34 2020 +0800

    Fix synchronization on boxed types or strings
---
 .../common/persistence/JDBCResourceStore.java      | 47 +++++++++++++---------
 .../apache/kylin/dict/lookup/SnapshotManager.java  | 44 ++++++++++++--------
 2 files changed, 54 insertions(+), 37 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
index 2874bbd..15d6de5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -32,6 +32,7 @@ import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.kylin.common.KylinConfig;
@@ -44,28 +45,17 @@ import com.google.common.base.Preconditions;
 
 public class JDBCResourceStore extends PushdownResourceStore {
 
-    private static Logger logger = LoggerFactory.getLogger(JDBCResourceStore.class);
-
     public static final String JDBC_SCHEME = "jdbc";
-
+    private static final ConcurrentHashMap<String, Object> lockObjectMap = new ConcurrentHashMap<>();
     private static final String META_TABLE_KEY = "META_TABLE_KEY";
-
     private static final String META_TABLE_TS = "META_TABLE_TS";
-
     private static final String META_TABLE_CONTENT = "META_TABLE_CONTENT";
-
-    public static void checkScheme(StorageURL url) {
-        Preconditions.checkState(JDBC_SCHEME.equals(url.getScheme()));
-    }
-
-    // ============================================================================
-
+    private static Logger logger = LoggerFactory.getLogger(JDBCResourceStore.class);
     private JDBCConnectionManager connectionManager;
 
+    // ============================================================================
     private String[] tableNames = new String[2];
-
     private String metadataIdentifier = null;
-
     // For test
     private long queriedSqlNum = 0;
 
@@ -82,11 +72,21 @@ public class JDBCResourceStore extends PushdownResourceStore {
         }
     }
 
-    abstract static class SqlOperation {
-        PreparedStatement pstat = null;
-        ResultSet rs = null;
+    public static void checkScheme(StorageURL url) {
+        Preconditions.checkState(JDBC_SCHEME.equals(url.getScheme()));
+    }
 
-        abstract public void execute(final Connection connection) throws SQLException, IOException;
+    private Object getConcurrentObject(String resPath) {
+        if (!lockObjectMap.containsKey(resPath)) {
+            addObject(resPath);
+        }
+        return lockObjectMap.get(resPath);
+    }
+
+    private synchronized void addObject(String resPath) {
+        if (!lockObjectMap.containsKey(resPath)) {
+            lockObjectMap.put(resPath, new Object());
+        }
     }
 
     private void executeSql(SqlOperation operation) throws SQLException, IOException {
@@ -349,7 +349,7 @@ public class JDBCResourceStore extends PushdownResourceStore {
             @Override
             public void execute(Connection connection) throws SQLException, IOException {
                 byte[] bytes = content.extractAllBytes();
-                synchronized (resPath.intern()) {
+                synchronized (getConcurrentObject(resPath)) {
                     JDBCResourceSQL sqls = getJDBCResourceSQL(getMetaTableName(resPath));
                     boolean existing = existsImpl(resPath);
                     if (existing) {
@@ -439,7 +439,7 @@ public class JDBCResourceStore extends PushdownResourceStore {
         executeSql(new SqlOperation() {
             @Override
             public void execute(Connection connection) throws SQLException, IOException {
-                synchronized (resPath.intern()) {
+                synchronized (getConcurrentObject(resPath)) {
                     JDBCResourceSQL sqls = getJDBCResourceSQL(getMetaTableName(resPath));
                     if (!existsImpl(resPath)) {
                         if (oldTS != 0) {
@@ -651,4 +651,11 @@ public class JDBCResourceStore extends PushdownResourceStore {
         return "/".equals(path);
     }
 
+    abstract static class SqlOperation {
+        PreparedStatement pstat = null;
+        ResultSet rs = null;
+
+        abstract public void execute(final Connection connection) throws SQLException, IOException;
+    }
+
 }
\ No newline at end of file
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 8f68fb0..76a3df9 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.dict.lookup;
 import java.io.IOException;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -38,8 +39,6 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Interner;
-import com.google.common.collect.Interners;
 import com.google.common.collect.Lists;
 
 /**
@@ -48,23 +47,13 @@ import com.google.common.collect.Lists;
 public class SnapshotManager {
 
     private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
-
-    public static SnapshotManager getInstance(KylinConfig config) {
-        return config.getManager(SnapshotManager.class);
-    }
-
-    // called by reflection
-    static SnapshotManager newInstance(KylinConfig config) throws IOException {
-        return new SnapshotManager(config);
-    }
-
-    // ============================================================================
-
+    private static final ConcurrentHashMap<String, Object> lockObjectMap = new ConcurrentHashMap<>();
     private KylinConfig config;
-
     // path ==> SnapshotTable
     private LoadingCache<String, SnapshotTable> snapshotCache; // resource
 
+    // ============================================================================
+
     private SnapshotManager(KylinConfig config) {
         this.config = config;
         this.snapshotCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, SnapshotTable>() {
@@ -83,6 +72,28 @@ public class SnapshotManager {
                 });
     }
 
+    public static SnapshotManager getInstance(KylinConfig config) {
+        return config.getManager(SnapshotManager.class);
+    }
+
+    // called by reflection
+    static SnapshotManager newInstance(KylinConfig config) throws IOException {
+        return new SnapshotManager(config);
+    }
+
+    private Object getConcurrentObject(String resPath) {
+        if (!lockObjectMap.containsKey(resPath)) {
+            addObject(resPath);
+        }
+        return lockObjectMap.get(resPath);
+    }
+
+    private synchronized void addObject(String resPath) {
+        if (!lockObjectMap.containsKey(resPath)) {
+            lockObjectMap.put(resPath, new Object());
+        }
+    }
+
     public void wipeoutCache() {
         snapshotCache.invalidateAll();
     }
@@ -127,9 +138,8 @@ public class SnapshotManager {
             throws IOException {
         SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
         snapshot.updateRandomUuid();
-        Interner<String> pool = Interners.newWeakInterner();
 
-        synchronized (pool.intern(tableDesc.getIdentity())) {
+        synchronized (getConcurrentObject(tableDesc.getIdentity())) {
             SnapshotTable reusableSnapshot = getReusableSnapShot(table, snapshot, tableDesc, cubeConfig);
             if (reusableSnapshot != null)
                 return updateDictLastModifiedTime(reusableSnapshot.getResourcePath());