You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mc...@apache.org on 2019/06/06 23:54:41 UTC

[geode] branch develop updated: GEODE-6842: Making serialization of CqNameToOpHashMap thread safe (#3680)

This is an automated email from the ASF dual-hosted git repository.

mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 610992a  GEODE-6842: Making serialization of CqNameToOpHashMap thread safe (#3680)
610992a is described below

commit 610992ae8ba1c8d26756606bb62e871ea846544f
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Thu Jun 6 16:54:30 2019 -0700

    GEODE-6842: Making serialization of CqNameToOpHashMap thread safe (#3680)
    
    By making the CqNameToOpHashMap derive from ConcurrentHashMap and
    taking a snapshot prior to serialization, we avoid a race where
    this map could be mutated by a client registration thread while a
    queue GII which includes this map is occuring in another thread.
---
 .../sockets/CqNameToOpHashMapIntegrationTest.java  | 110 +++++++++++++++++++++
 .../tier/sockets/ClientUpdateMessageImpl.java      |   8 +-
 2 files changed, 115 insertions(+), 3 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CqNameToOpHashMapIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CqNameToOpHashMapIntegrationTest.java
new file mode 100644
index 0000000..d8628b2
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CqNameToOpHashMapIntegrationTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.internal.InternalDataSerializer;
+
+public class CqNameToOpHashMapIntegrationTest {
+  /**
+   * This test ensures that we can safely mutate the CqNameToOpHashMap while it is being
+   * serialized in another thread. The use case for this is that we repopulate this map
+   * during client registration, which can happen concurrently with a GII which causes
+   * serialization to occur if this map is referenced by any of the client subscription
+   * queues. This integration test does not exercise this full system level interaction,
+   * but rather does the minimum necessary to prove that map mutation and serialization
+   * can occur concurrently without any issues such as size mismatches or
+   * ConcurrentModificationExceptions.
+   */
+  @Test
+  public void testSendToWhileConcurrentlyModifyingMapContentsAndVerifyProperSerialization()
+      throws IOException, ClassNotFoundException, InterruptedException, ExecutionException,
+      TimeoutException {
+    final int numEntries = 1000000;
+
+    ClientUpdateMessageImpl.CqNameToOpHashMap originalCqNameToOpHashMap =
+        new ClientUpdateMessageImpl.CqNameToOpHashMap(numEntries);
+    ClientUpdateMessageImpl.CqNameToOpHashMap modifiedCqNameToOpHashMap =
+        new ClientUpdateMessageImpl.CqNameToOpHashMap(numEntries);
+
+    for (int i = 0; i < numEntries; ++i) {
+      originalCqNameToOpHashMap.add(String.valueOf(i), i);
+      modifiedCqNameToOpHashMap.add(String.valueOf(i), i);
+    }
+
+    CompletableFuture<Void> removeFromHashMapTask = CompletableFuture.runAsync(() -> {
+      for (int i = 0; i < numEntries; ++i) {
+        modifiedCqNameToOpHashMap.remove(String.valueOf(i), i);
+      }
+    });
+
+    CompletableFuture<Void> serializeReconstructHashMapTask = CompletableFuture.runAsync(() -> {
+      try {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+        modifiedCqNameToOpHashMap.sendTo(outputStream);
+
+        ByteArrayInputStream byteArrayInputStream =
+            new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
+
+        DataInputStream inputStream = new DataInputStream(byteArrayInputStream);
+
+        byte typeByte = inputStream.readByte();
+        int cqNamesSize = InternalDataSerializer.readArrayLength(inputStream);
+
+        ClientUpdateMessageImpl.CqNameToOpHashMap reconstructedCqNameToOpHashMap =
+            new ClientUpdateMessageImpl.CqNameToOpHashMap(cqNamesSize);
+
+        for (int j = 0; j < cqNamesSize; j++) {
+          String cqNamesKey = DataSerializer.<String>readObject(inputStream);
+          Integer cqNamesValue = DataSerializer.<Integer>readObject(inputStream);
+          reconstructedCqNameToOpHashMap.add(cqNamesKey, cqNamesValue);
+        }
+
+        // The reconstructed map should have some subset of the entries in the cqNameToOpHashMap,
+        // but the specific contents will depend on timing with the removeFromHashMap task.
+        MapDifference<String, Integer> reconstructedVersusOriginalHashMapDifference =
+            Maps.difference(reconstructedCqNameToOpHashMap, originalCqNameToOpHashMap);
+        assertThat(reconstructedVersusOriginalHashMapDifference.entriesInCommon().size() >= 0)
+            .isTrue();
+        assertThat(reconstructedVersusOriginalHashMapDifference.entriesDiffering().size() == 0)
+            .isTrue();
+      } catch (Exception ex) {
+        throw new RuntimeException("Failed to serialize/deserialize the CqNameToOpHashMap", ex);
+      }
+    });
+
+    CompletableFuture.allOf(removeFromHashMapTask, serializeReconstructHashMapTask).get(1,
+        TimeUnit.MINUTES);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 64c180b..b6dbac0 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -1557,9 +1557,11 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     }
   }
   /**
-   * Basically just a HashMap<String, Integer> but limits itself to the CqNameToOp interface.
+   * Basically just a ConcurrentHashMap<String, Integer> but limits itself to the CqNameToOp
+   * interface.
    */
-  public static class CqNameToOpHashMap extends HashMap<String, Integer> implements CqNameToOp {
+  public static class CqNameToOpHashMap extends ConcurrentHashMap<String, Integer>
+      implements CqNameToOp {
     public CqNameToOpHashMap(int initialCapacity) {
       super(initialCapacity, 1.0f);
     }
@@ -1573,7 +1575,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     public void sendTo(DataOutput out) throws IOException {
       // When serialized it needs to look just as if writeObject was called on a HASH_MAP
       out.writeByte(DSCODE.HASH_MAP.toByte());
-      DataSerializer.writeHashMap(this, out);
+      DataSerializer.writeConcurrentHashMap(this, out);
     }
 
     @Override