You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2019/02/25 18:31:31 UTC

[geode] branch develop updated: GEODE-6404: work around possible sync issue with computeIfAbsent (#3196)

This is an automated email from the ASF dual-hosted git repository.

jasonhuynh pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2be6375  GEODE-6404: work around possible sync issue with computeIfAbsent (#3196)
2be6375 is described below

commit 2be6375a775b6b0d00d0c41a1e2a3bf4b8745a46
Author: Jason Huynh <hu...@gmail.com>
AuthorDate: Mon Feb 25 18:31:21 2019 +0000

    GEODE-6404: work around possible sync issue with computeIfAbsent (#3196)
    
    * GEODE-6404: work around possible synchronization issue with computeIfAbsent
    
      *  Tries to do the get() outside of a lock before computing
      *  Added jmh test
      *  Local benchmarking showed that although jdk 11 fixes the contention issue that
           performing the get was faster than the retrieve mechanism of computeIfAbsent
---
 .../jdbc/internal/TableMetaDataManager.java        |   3 +-
 .../internal/util/ComputeIfAbsentBenchmark.java    | 107 +++++++++++++++++++++
 .../cache/query/internal/AttributeDescriptor.java  |   3 +-
 .../internal/InternalDistributedSystem.java        |   3 +-
 .../membership/gms/messenger/GMSEncrypt.java       |   3 +-
 .../internal/cache/backup/BackupDefinition.java    |   4 +-
 .../cache/tier/sockets/ClientHealthMonitor.java    |   4 +-
 .../geode/internal/util/JavaWorkarounds.java       |  34 +++++++
 .../cache/query/cq/internal/CqServiceImpl.java     |   4 +-
 .../geode/cache/lucene/FlatFormatSerializer.java   |   6 +-
 10 files changed, 162 insertions(+), 9 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
index f2b20df..1cce4d5 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.geode.connectors.jdbc.JdbcConnectorException;
 import org.apache.geode.connectors.jdbc.internal.TableMetaData.ColumnMetaData;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
+import org.apache.geode.internal.util.JavaWorkarounds;
 
 /**
  * Given a tableName this manager will determine which column should correspond to the Geode Region
@@ -46,7 +47,7 @@ public class TableMetaDataManager {
 
   public TableMetaDataView getTableMetaDataView(Connection connection,
       RegionMapping regionMapping) {
-    return tableToMetaDataMap.computeIfAbsent(computeTableName(regionMapping),
+    return JavaWorkarounds.computeIfAbsent(tableToMetaDataMap, computeTableName(regionMapping),
         k -> computeTableMetaDataView(connection, k, regionMapping));
   }
 
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java b/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
new file mode 100644
index 0000000..88af9d6
--- /dev/null
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/util/ComputeIfAbsentBenchmark.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.util;
+
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+
+/**
+ * Test spins up threads that constantly do computeIfAbsent
+ * The tests will measure throughput
+ * The benchmark tests computeIfAbsent in the presence of other threads contending for the same key
+ */
+
+@State(Scope.Thread)
+@Fork(1)
+public class ComputeIfAbsentBenchmark {
+
+  public Map map = new ConcurrentHashMap();
+  /*
+   * After load is established, how many measurements shall we take?
+   */
+  private static final double BENCHMARK_ITERATIONS = 10;
+
+  private static final int TIME_TO_QUIESCE_BEFORE_SAMPLING = 1;
+
+  private static final int THREAD_POOL_PROCESSOR_MULTIPLE = 2;
+
+  private ScheduledThreadPoolExecutor loadGenerationExecutorService;
+
+  private static boolean testRunning = true;
+
+  @Setup(Level.Trial)
+  public void trialSetup() throws InterruptedException {
+
+    final int numberOfThreads =
+        THREAD_POOL_PROCESSOR_MULTIPLE * Runtime.getRuntime().availableProcessors();
+
+    loadGenerationExecutorService =
+        (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(
+            numberOfThreads);
+
+    System.out.println(String.format("Pool has %d threads", numberOfThreads));
+
+    loadGenerationExecutorService.setRemoveOnCancelPolicy(true);
+
+    generateLoad(
+        loadGenerationExecutorService, numberOfThreads);
+
+    // allow system to quiesce
+    Thread.sleep(TIME_TO_QUIESCE_BEFORE_SAMPLING);
+  }
+
+  @TearDown(Level.Trial)
+  public void trialTeardown() {
+    testRunning = false;
+    loadGenerationExecutorService.shutdownNow();
+  }
+
+  @Benchmark
+  @Measurement(iterations = (int) BENCHMARK_ITERATIONS)
+  @BenchmarkMode(Mode.Throughput)
+  @OutputTimeUnit(TimeUnit.SECONDS)
+  // @Warmup we don't warm up because our @Setup warms us up
+  public Object computeIfAbsent() {
+    return JavaWorkarounds.computeIfAbsent(map, 1, k -> k);
+  }
+
+  private void generateLoad(final ScheduledExecutorService executorService, int numThreads) {
+    for (int i = 0; i < numThreads; i++) {
+      executorService.schedule(() -> {
+        while (testRunning) {
+          JavaWorkarounds.computeIfAbsent(map, 1, k -> k);
+        }
+      }, 0, TimeUnit.MILLISECONDS);
+    }
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AttributeDescriptor.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AttributeDescriptor.java
index e50f97a..1555f67 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AttributeDescriptor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AttributeDescriptor.java
@@ -34,6 +34,7 @@ import org.apache.geode.cache.query.NameNotFoundException;
 import org.apache.geode.cache.query.QueryInvocationTargetException;
 import org.apache.geode.cache.query.QueryService;
 import org.apache.geode.internal.cache.Token;
+import org.apache.geode.internal.util.JavaWorkarounds;
 import org.apache.geode.pdx.JSONFormatter;
 import org.apache.geode.pdx.PdxSerializationException;
 import org.apache.geode.pdx.internal.InternalPdxInstance;
@@ -152,7 +153,7 @@ public class AttributeDescriptor {
     key.add(targetClass);
     key.add(_name);
 
-    Member m = _localCache.computeIfAbsent(key, k -> {
+    Member m = JavaWorkarounds.computeIfAbsent(_localCache, key, k -> {
       Member member = getReadField(targetClass);
       return member == null ? getReadMethod(targetClass) : member;
     });
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index 4011669..82e885a 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -116,6 +116,7 @@ import org.apache.geode.internal.statistics.StatisticsManagerFactory;
 import org.apache.geode.internal.statistics.StatisticsRegistry;
 import org.apache.geode.internal.statistics.platform.LinuxProcFsStatistics;
 import org.apache.geode.internal.tcp.ConnectionTable;
+import org.apache.geode.internal.util.JavaWorkarounds;
 import org.apache.geode.management.ManagementException;
 import org.apache.geode.pdx.internal.TypeRegistry;
 import org.apache.geode.security.GemFireSecurityException;
@@ -2006,7 +2007,7 @@ public class InternalDistributedSystem extends DistributedSystem
   private FunctionServiceStats functionServiceStats = null;
 
   public FunctionStats getFunctionStats(String textId) {
-    return functionExecutionStatsMap.computeIfAbsent(textId,
+    return JavaWorkarounds.computeIfAbsent(functionExecutionStatsMap, textId,
         key -> new FunctionStats(this, key));
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java
index 82de1de..1b2e035 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncrypt.java
@@ -36,6 +36,7 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.distributed.internal.membership.NetView;
 import org.apache.geode.distributed.internal.membership.gms.Services;
 import org.apache.geode.distributed.internal.membership.gms.locator.GMSLocator;
+import org.apache.geode.internal.util.JavaWorkarounds;
 
 public final class GMSEncrypt {
   // Parameters for the Diffie-Hellman key exchange
@@ -197,7 +198,7 @@ public final class GMSEncrypt {
 
   private GMSEncryptionCipherPool getPeerEncryptor(InternalDistributedMember member)
       throws Exception {
-    return peerEncryptors.computeIfAbsent(member, (mbr) -> {
+    return JavaWorkarounds.computeIfAbsent(peerEncryptors, member, (mbr) -> {
       try {
         return new GMSEncryptionCipherPool(this, generateSecret(lookupKeyByMember(member)));
       } catch (Exception ex) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
index c88edd7..eaa63d3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.geode.cache.DiskStore;
+import org.apache.geode.internal.util.JavaWorkarounds;
 
 class BackupDefinition {
 
@@ -79,7 +80,8 @@ class BackupDefinition {
   }
 
   void addOplogFileToBackup(DiskStore diskStore, Path fileLocation) {
-    Set<Path> files = oplogFilesByDiskStore.computeIfAbsent(diskStore, k -> new HashSet<>());
+    Set<Path> files =
+        JavaWorkarounds.computeIfAbsent(oplogFilesByDiskStore, diskStore, k -> new HashSet<>());
     files.add(fileLocation);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index 421e427..562872a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -41,6 +41,7 @@ import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.tier.ServerSideHandshake;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThread;
+import org.apache.geode.internal.util.JavaWorkarounds;
 
 /**
  * Class <code>ClientHealthMonitor</code> is a server-side singleton that monitors the health of
@@ -658,7 +659,8 @@ public class ClientHealthMonitor {
   }
 
   public ServerConnectionCollection getProxyIdCollection(ClientProxyMembershipID proxyID) {
-    return proxyIdConnections.computeIfAbsent(proxyID, key -> new ServerConnectionCollection());
+    return JavaWorkarounds.computeIfAbsent(proxyIdConnections, proxyID,
+        key -> new ServerConnectionCollection());
   }
 
   public Map<ClientProxyMembershipID, MutableInt> getCleanupProxyIdTable() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/JavaWorkarounds.java b/geode-core/src/main/java/org/apache/geode/internal/util/JavaWorkarounds.java
new file mode 100644
index 0000000..fc1130d
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/JavaWorkarounds.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.util;
+
+import java.util.Map;
+import java.util.function.Function;
+
+public class JavaWorkarounds {
+
+
+  // This is a workaround for computeIfAbsent which unnecessarily takes out a write lock in the case
+  // where the entry for the key exists already. This only affects pre Java 9 jdks
+  // https://bugs.openjdk.java.net/browse/JDK-8161372
+  public static <K, V> V computeIfAbsent(Map<K, V> map, K key,
+      Function<? super K, ? extends V> mappingFunction) {
+    V existingValue = map.get(key);
+    if (existingValue == null) {
+      return map.computeIfAbsent(key, mappingFunction);
+    }
+    return existingValue;
+  }
+}
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
index 7227eae..fa63e32 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
@@ -80,6 +80,7 @@ import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.Part;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.util.JavaWorkarounds;
 
 /**
  * Implements the CqService functionality.
@@ -867,7 +868,8 @@ public class CqServiceImpl implements CqService {
   @Override
   public String constructServerCqName(String cqName, ClientProxyMembershipID clientProxyId) {
     ConcurrentHashMap<ClientProxyMembershipID, String> cache =
-        serverCqNameCache.computeIfAbsent(cqName, key -> new ConcurrentHashMap<>());
+        JavaWorkarounds.computeIfAbsent(serverCqNameCache, cqName,
+            key -> new ConcurrentHashMap<>());
 
     String cName = cache.get(clientProxyId);
     if (null == cName) {
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/FlatFormatSerializer.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/FlatFormatSerializer.java
index f94f631..62006db 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/FlatFormatSerializer.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/FlatFormatSerializer.java
@@ -28,6 +28,7 @@ import org.apache.lucene.document.Document;
 
 import org.apache.geode.cache.lucene.internal.repository.serializer.SerializerUtil;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.util.JavaWorkarounds;
 import org.apache.geode.pdx.PdxInstance;
 
 /**
@@ -84,8 +85,9 @@ public class FlatFormatSerializer implements LuceneSerializer {
   }
 
   private List<String> tokenizeField(String indexedFieldName) {
-    List<String> tokenizedFields = tokenizedFieldCache.computeIfAbsent(indexedFieldName,
-        field -> Arrays.asList(indexedFieldName.split("\\.")));
+    List<String> tokenizedFields =
+        JavaWorkarounds.computeIfAbsent(tokenizedFieldCache, indexedFieldName,
+            field -> Arrays.asList(indexedFieldName.split("\\.")));
     return tokenizedFields;
   }