You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2019/02/27 19:29:02 UTC
[geode] branch release/1.9.0 updated: GEODE-6404: work around
possible sync issue with computeIfAbsent (#3196)
This is an automated email from the ASF dual-hosted git repository.
jasonhuynh pushed a commit to branch release/1.9.0
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/release/1.9.0 by this push:
new 40b3d9c GEODE-6404: work around possible sync issue with computeIfAbsent (#3196)
40b3d9c is described below
commit 40b3d9c690257d3e0e9bccfa1677c8adcbdce096
Author: Jason Huynh <hu...@gmail.com>
AuthorDate: Mon Feb 25 18:31:21 2019 +0000
GEODE-6404: work around possible sync issue with computeIfAbsent (#3196)
* GEODE-6404: work around possible synchronization issue with computeIfAbsent
* Tries to do the get() outside of a lock before computing
* Added jmh test
* Local benchmarking showed that although jdk 11 fixes the contention issue that
performing the get was faster than the retrieve mechanism of computeIfAbsent
(cherry picked from commit 2be6375a775b6b0d00d0c41a1e2a3bf4b8745a46)
---
.../jdbc/internal/TableMetaDataManager.java | 3 +-
.../internal/util/ComputeIfAbsentBenchmark.java | 107 +++++++++++++++++++++
.../cache/query/internal/AttributeDescriptor.java | 3 +-
.../internal/InternalDistributedSystem.java | 3 +-
.../membership/gms/messenger/GMSEncrypt.java | 3 +-
.../internal/cache/backup/BackupDefinition.java | 4 +-
.../cache/tier/sockets/ClientHealthMonitor.java | 4 +-
.../geode/internal/util/JavaWorkarounds.java | 34 +++++++
.../cache/query/cq/internal/CqServiceImpl.java | 4 +-
.../geode/cache/lucene/FlatFormatSerializer.java | 6 +-
10 files changed, 162 insertions(+), 9 deletions(-)
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
index 5b3ccf9..375db58 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.geode.connectors.jdbc.JdbcConnectorException;
import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
+import org.apache.geode.internal.util.JavaWorkarounds;
/**
* Given a tableName this manager will determine which column should correspond to the Geode Region
@@ -44,7 +45,7 @@ public class TableMetaDataManager {
public TableMetaDataView getTableMetaDataView(Connection connection,
RegionMapping regionMapping) {
- return tableToMetaDataMap.computeIfAbsent(computeTableName(regionMapping),
+ return JavaWorkarounds.computeIfAbsent(tableToMetaDataMap, computeTableName(regionMapping),
k -> computeTableMetaDataView(connection, k, regionMapping));
}
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java b/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
new file mode 100644
index 0000000..88af9d6
--- /dev/null
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.util;
+
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+
+/**
+ * Test spins up threads that constantly do computeIfAbsent
+ * The tests will measure throughput
+ * The benchmark tests computeIfAbsent in the presence of other threads contending for the same key
+ */
+
+@State(Scope.Thread)
+@Fork(1)
+public class ComputeIfAbsentBenchmark {
+
+ public Map map = new ConcurrentHashMap();
+ /*
+ * After load is established, how many measurements shall we take?
+ */
+ private static final double BENCHMARK_ITERATIONS = 10;
+
+ private static final int TIME_TO_QUIESCE_BEFORE_SAMPLING = 1;
+
+ private static final int THREAD_POOL_PROCESSOR_MULTIPLE = 2;
+
+ private ScheduledThreadPoolExecutor loadGenerationExecutorService;
+
+ private static boolean testRunning = true;
+
+ @Setup(Level.Trial)
+ public void trialSetup() throws InterruptedException {
+
+ final int numberOfThreads =
+ THREAD_POOL_PROCESSOR_MULTIPLE * Runtime.getRuntime().availableProcessors();
+
+ loadGenerationExecutorService =
+ (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(
+ numberOfThreads);
+
+ System.out.println(String.format("Pool has %d threads", numberOfThreads));
+
+ loadGenerationExecutorService.setRemoveOnCancelPolicy(true);
+
+ generateLoad(
+ loadGenerationExecutorService, numberOfThreads);
+
+ // allow system to quiesce
+ Thread.sleep(TIME_TO_QUIESCE_BEFORE_SAMPLING);
+ }
+
+ @TearDown(Level.Trial)
+ public void trialTeardown() {
+ testRunning = false;
+ loadGenerationExecutorService.shutdownNow();
+ }
+
+ @Benchmark
+ @Measurement(iterations = (int) BENCHMARK_ITERATIONS)
+ @BenchmarkMode(Mode.Throughput)
+ @OutputTimeUnit(TimeUnit.SECONDS)
+ // @Warmup we don't warm up because our @Setup warms us up
+ public Object computeIfAbsent() {
+ return JavaWorkarounds.computeIfAbsent(map, 1, k -> k);
+ }
+
+ private void generateLoad(final ScheduledExecutorService executorService, int numThreads) {
+ for (int i = 0; i < numThreads; i++) {
+ executorService.schedule(() -> {
+ while (testRunning) {
+ JavaWorkarounds.computeIfAbsent(map, 1, k -> k);
+ }
+ }, 0, TimeUnit.MILLISECONDS);
+ }
+ }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AttributeDescriptor.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AttributeDescriptor.java
index 274468a..46009d5 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AttributeDescriptor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AttributeDescriptor.java
@@ -33,6 +33,7 @@ import org.apache.geode.cache.query.NameNotFoundException;
import org.apache.geode.cache.query.QueryInvocationTargetException;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.internal.cache.Token;
+import org.apache.geode.internal.util.JavaWorkarounds;
import org.apache.geode.pdx.JSONFormatter;
import org.apache.geode.pdx.PdxSerializationException;
import org.apache.geode.pdx.internal.InternalPdxInstance;
@@ -150,7 +151,7 @@ public class AttributeDescriptor {
key.add(targetClass);
key.add(_name);
- Member m = _localCache.computeIfAbsent(key, k -> {
+ Member m = JavaWorkarounds.computeIfAbsent(_localCache, key, k -> {
Member member = getReadField(targetClass);
return member == null ? getReadMethod(targetClass) : member;
});
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index 78d4619..2850b64 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -113,6 +113,7 @@ import org.apache.geode.internal.statistics.StatisticsManagerFactory;
import org.apache.geode.internal.statistics.StatisticsRegistry;
import org.apache.geode.internal.statistics.platform.LinuxProcFsStatistics;
import org.apache.geode.internal.tcp.ConnectionTable;
+import org.apache.geode.internal.util.JavaWorkarounds;
import org.apache.geode.management.ManagementException;
import org.apache.geode.pdx.internal.TypeRegistry;
import org.apache.geode.security.GemFireSecurityException;
@@ -1996,7 +1997,7 @@ public class InternalDistributedSystem extends DistributedSystem
private FunctionServiceStats functionServiceStats = null;
public FunctionStats getFunctionStats(String textId) {
- return functionExecutionStatsMap.computeIfAbsent(textId,
+ return JavaWorkarounds.computeIfAbsent(functionExecutionStatsMap, textId,
key -> new FunctionStats(this, key));
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java
index 82de1de..1b2e035 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java
@@ -36,6 +36,7 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.distributed.internal.membership.NetView;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.locator.GMSLocator;
+import org.apache.geode.internal.util.JavaWorkarounds;
public final class GMSEncrypt {
// Parameters for the Diffie-Hellman key exchange
@@ -197,7 +198,7 @@ public final class GMSEncrypt {
private GMSEncryptionCipherPool getPeerEncryptor(InternalDistributedMember member)
throws Exception {
- return peerEncryptors.computeIfAbsent(member, (mbr) -> {
+ return JavaWorkarounds.computeIfAbsent(peerEncryptors, member, (mbr) -> {
try {
return new GMSEncryptionCipherPool(this, generateSecret(lookupKeyByMember(member)));
} catch (Exception ex) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
index c88edd7..eaa63d3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.geode.cache.DiskStore;
+import org.apache.geode.internal.util.JavaWorkarounds;
class BackupDefinition {
@@ -79,7 +80,8 @@ class BackupDefinition {
}
void addOplogFileToBackup(DiskStore diskStore, Path fileLocation) {
- Set<Path> files = oplogFilesByDiskStore.computeIfAbsent(diskStore, k -> new HashSet<>());
+ Set<Path> files =
+ JavaWorkarounds.computeIfAbsent(oplogFilesByDiskStore, diskStore, k -> new HashSet<>());
files.add(fileLocation);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index f12f96c..90331bf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -40,6 +40,7 @@ import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.tier.ServerSideHandshake;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThread;
+import org.apache.geode.internal.util.JavaWorkarounds;
/**
* Class <code>ClientHealthMonitor</code> is a server-side singleton that monitors the health of
@@ -655,7 +656,8 @@ public class ClientHealthMonitor {
}
public ServerConnectionCollection getProxyIdCollection(ClientProxyMembershipID proxyID) {
- return proxyIdConnections.computeIfAbsent(proxyID, key -> new ServerConnectionCollection());
+ return JavaWorkarounds.computeIfAbsent(proxyIdConnections, proxyID,
+ key -> new ServerConnectionCollection());
}
public Map<ClientProxyMembershipID, MutableInt> getCleanupProxyIdTable() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/JavaWorkarounds.java b/geode-core/src/main/java/org/apache/geode/internal/util/JavaWorkarounds.java
new file mode 100644
index 0000000..fc1130d
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/JavaWorkarounds.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.util;
+
+import java.util.Map;
+import java.util.function.Function;
+
+public class JavaWorkarounds {
+
+
+ // This is a workaround for computeIfAbsent which unnecessarily takes out a write lock in the case
+ // where the entry for the key exists already. This only affects pre Java 9 jdks
+ // https://bugs.openjdk.java.net/browse/JDK-8161372
+ public static <K, V> V computeIfAbsent(Map<K, V> map, K key,
+ Function<? super K, ? extends V> mappingFunction) {
+ V existingValue = map.get(key);
+ if (existingValue == null) {
+ return map.computeIfAbsent(key, mappingFunction);
+ }
+ return existingValue;
+ }
+}
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
index 7227eae..fa63e32 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
@@ -80,6 +80,7 @@ import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.Part;
import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.util.JavaWorkarounds;
/**
* Implements the CqService functionality.
@@ -867,7 +868,8 @@ public class CqServiceImpl implements CqService {
@Override
public String constructServerCqName(String cqName, ClientProxyMembershipID clientProxyId) {
ConcurrentHashMap<ClientProxyMembershipID, String> cache =
- serverCqNameCache.computeIfAbsent(cqName, key -> new ConcurrentHashMap<>());
+ JavaWorkarounds.computeIfAbsent(serverCqNameCache, cqName,
+ key -> new ConcurrentHashMap<>());
String cName = cache.get(clientProxyId);
if (null == cName) {
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/FlatFormatSerializer.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/FlatFormatSerializer.java
index f94f631..62006db 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/FlatFormatSerializer.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/FlatFormatSerializer.java
@@ -28,6 +28,7 @@ import org.apache.lucene.document.Document;
import org.apache.geode.cache.lucene.internal.repository.serializer.SerializerUtil;
import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.util.JavaWorkarounds;
import org.apache.geode.pdx.PdxInstance;
/**
@@ -84,8 +85,9 @@ public class FlatFormatSerializer implements LuceneSerializer {
}
private List<String> tokenizeField(String indexedFieldName) {
- List<String> tokenizedFields = tokenizedFieldCache.computeIfAbsent(indexedFieldName,
- field -> Arrays.asList(indexedFieldName.split("\\.")));
+ List<String> tokenizedFields =
+ JavaWorkarounds.computeIfAbsent(tokenizedFieldCache, indexedFieldName,
+ field -> Arrays.asList(indexedFieldName.split("\\.")));
return tokenizedFields;
}