You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/02/07 08:04:31 UTC
[kylin] 04/15: Fix synchronization on boxed types or strings
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 79207d8961e868a850bb6014cba68c45b1ad7b24
Author: nichunen <ni...@apache.org>
AuthorDate: Wed Jan 15 14:51:34 2020 +0800
Fix synchronization on boxed types or strings
---
.../common/persistence/JDBCResourceStore.java | 47 +++++++++++++---------
.../apache/kylin/dict/lookup/SnapshotManager.java | 44 ++++++++++++--------
2 files changed, 54 insertions(+), 37 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
index 2874bbd..15d6de5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -32,6 +32,7 @@ import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.kylin.common.KylinConfig;
@@ -44,28 +45,17 @@ import com.google.common.base.Preconditions;
public class JDBCResourceStore extends PushdownResourceStore {
- private static Logger logger = LoggerFactory.getLogger(JDBCResourceStore.class);
-
public static final String JDBC_SCHEME = "jdbc";
-
+ private static final ConcurrentHashMap<String, Object> lockObjectMap = new ConcurrentHashMap<>();
private static final String META_TABLE_KEY = "META_TABLE_KEY";
-
private static final String META_TABLE_TS = "META_TABLE_TS";
-
private static final String META_TABLE_CONTENT = "META_TABLE_CONTENT";
-
- public static void checkScheme(StorageURL url) {
- Preconditions.checkState(JDBC_SCHEME.equals(url.getScheme()));
- }
-
- // ============================================================================
-
+ private static Logger logger = LoggerFactory.getLogger(JDBCResourceStore.class);
private JDBCConnectionManager connectionManager;
+ // ============================================================================
private String[] tableNames = new String[2];
-
private String metadataIdentifier = null;
-
// For test
private long queriedSqlNum = 0;
@@ -82,11 +72,21 @@ public class JDBCResourceStore extends PushdownResourceStore {
}
}
- abstract static class SqlOperation {
- PreparedStatement pstat = null;
- ResultSet rs = null;
+ public static void checkScheme(StorageURL url) {
+ Preconditions.checkState(JDBC_SCHEME.equals(url.getScheme()));
+ }
- abstract public void execute(final Connection connection) throws SQLException, IOException;
+ private Object getConcurrentObject(String resPath) {
+ if (!lockObjectMap.containsKey(resPath)) {
+ addObject(resPath);
+ }
+ return lockObjectMap.get(resPath);
+ }
+
+ private synchronized void addObject(String resPath) {
+ if (!lockObjectMap.containsKey(resPath)) {
+ lockObjectMap.put(resPath, new Object());
+ }
}
private void executeSql(SqlOperation operation) throws SQLException, IOException {
@@ -349,7 +349,7 @@ public class JDBCResourceStore extends PushdownResourceStore {
@Override
public void execute(Connection connection) throws SQLException, IOException {
byte[] bytes = content.extractAllBytes();
- synchronized (resPath.intern()) {
+ synchronized (getConcurrentObject(resPath)) {
JDBCResourceSQL sqls = getJDBCResourceSQL(getMetaTableName(resPath));
boolean existing = existsImpl(resPath);
if (existing) {
@@ -439,7 +439,7 @@ public class JDBCResourceStore extends PushdownResourceStore {
executeSql(new SqlOperation() {
@Override
public void execute(Connection connection) throws SQLException, IOException {
- synchronized (resPath.intern()) {
+ synchronized (getConcurrentObject(resPath)) {
JDBCResourceSQL sqls = getJDBCResourceSQL(getMetaTableName(resPath));
if (!existsImpl(resPath)) {
if (oldTS != 0) {
@@ -651,4 +651,11 @@ public class JDBCResourceStore extends PushdownResourceStore {
return "/".equals(path);
}
+ abstract static class SqlOperation {
+ PreparedStatement pstat = null;
+ ResultSet rs = null;
+
+ abstract public void execute(final Connection connection) throws SQLException, IOException;
+ }
+
}
\ No newline at end of file
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 8f68fb0..76a3df9 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.dict.lookup;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -38,8 +39,6 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Interner;
-import com.google.common.collect.Interners;
import com.google.common.collect.Lists;
/**
@@ -48,23 +47,13 @@ import com.google.common.collect.Lists;
public class SnapshotManager {
private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
-
- public static SnapshotManager getInstance(KylinConfig config) {
- return config.getManager(SnapshotManager.class);
- }
-
- // called by reflection
- static SnapshotManager newInstance(KylinConfig config) throws IOException {
- return new SnapshotManager(config);
- }
-
- // ============================================================================
-
+ private static final ConcurrentHashMap<String, Object> lockObjectMap = new ConcurrentHashMap<>();
private KylinConfig config;
-
// path ==> SnapshotTable
private LoadingCache<String, SnapshotTable> snapshotCache; // resource
+ // ============================================================================
+
private SnapshotManager(KylinConfig config) {
this.config = config;
this.snapshotCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, SnapshotTable>() {
@@ -83,6 +72,28 @@ public class SnapshotManager {
});
}
+ public static SnapshotManager getInstance(KylinConfig config) {
+ return config.getManager(SnapshotManager.class);
+ }
+
+ // called by reflection
+ static SnapshotManager newInstance(KylinConfig config) throws IOException {
+ return new SnapshotManager(config);
+ }
+
+ private Object getConcurrentObject(String resPath) {
+ if (!lockObjectMap.containsKey(resPath)) {
+ addObject(resPath);
+ }
+ return lockObjectMap.get(resPath);
+ }
+
+ private synchronized void addObject(String resPath) {
+ if (!lockObjectMap.containsKey(resPath)) {
+ lockObjectMap.put(resPath, new Object());
+ }
+ }
+
public void wipeoutCache() {
snapshotCache.invalidateAll();
}
@@ -127,9 +138,8 @@ public class SnapshotManager {
throws IOException {
SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
snapshot.updateRandomUuid();
- Interner<String> pool = Interners.newWeakInterner();
- synchronized (pool.intern(tableDesc.getIdentity())) {
+ synchronized (getConcurrentObject(tableDesc.getIdentity())) {
SnapshotTable reusableSnapshot = getReusableSnapShot(table, snapshot, tableDesc, cubeConfig);
if (reusableSnapshot != null)
return updateDictLastModifiedTime(reusableSnapshot.getResourcePath());