You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mc...@apache.org on 2019/06/06 23:54:41 UTC
[geode] branch develop updated: GEODE-6842: Making serialization of
CqNameToOpHashMap thread safe (#3680)
This is an automated email from the ASF dual-hosted git repository.
mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 610992a GEODE-6842: Making serialization of CqNameToOpHashMap thread safe (#3680)
610992a is described below
commit 610992ae8ba1c8d26756606bb62e871ea846544f
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Thu Jun 6 16:54:30 2019 -0700
GEODE-6842: Making serialization of CqNameToOpHashMap thread safe (#3680)
By making the CqNameToOpHashMap derive from ConcurrentHashMap and
taking a snapshot prior to serialization, we avoid a race where
this map could be mutated by a client registration thread while a
queue GII which includes this map is occuring in another thread.
---
.../sockets/CqNameToOpHashMapIntegrationTest.java | 110 +++++++++++++++++++++
.../tier/sockets/ClientUpdateMessageImpl.java | 8 +-
2 files changed, 115 insertions(+), 3 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CqNameToOpHashMapIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CqNameToOpHashMapIntegrationTest.java
new file mode 100644
index 0000000..d8628b2
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CqNameToOpHashMapIntegrationTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.internal.InternalDataSerializer;
+
+public class CqNameToOpHashMapIntegrationTest {
+ /**
+ * This test ensures that we can safely mutate the CqNameToOpHashMap while it is being
+ * serialized in another thread. The use case for this is that we repopulate this map
+ * during client registration, which can happen concurrently with a GII which causes
+ * serialization to occur if this map is referenced by any of the client subscription
+ * queues. This integration test does not exercise this full system level interaction,
+ * but rather does the minimum necessary to prove that map mutation and serialization
+ * can occur concurrently without any issues such as size mismatches or
+ * ConcurrentModificationExceptions.
+ */
+ @Test
+ public void testSendToWhileConcurrentlyModifyingMapContentsAndVerifyProperSerialization()
+ throws IOException, ClassNotFoundException, InterruptedException, ExecutionException,
+ TimeoutException {
+ final int numEntries = 1000000;
+
+ ClientUpdateMessageImpl.CqNameToOpHashMap originalCqNameToOpHashMap =
+ new ClientUpdateMessageImpl.CqNameToOpHashMap(numEntries);
+ ClientUpdateMessageImpl.CqNameToOpHashMap modifiedCqNameToOpHashMap =
+ new ClientUpdateMessageImpl.CqNameToOpHashMap(numEntries);
+
+ for (int i = 0; i < numEntries; ++i) {
+ originalCqNameToOpHashMap.add(String.valueOf(i), i);
+ modifiedCqNameToOpHashMap.add(String.valueOf(i), i);
+ }
+
+ CompletableFuture<Void> removeFromHashMapTask = CompletableFuture.runAsync(() -> {
+ for (int i = 0; i < numEntries; ++i) {
+ modifiedCqNameToOpHashMap.remove(String.valueOf(i), i);
+ }
+ });
+
+ CompletableFuture<Void> serializeReconstructHashMapTask = CompletableFuture.runAsync(() -> {
+ try {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+ modifiedCqNameToOpHashMap.sendTo(outputStream);
+
+ ByteArrayInputStream byteArrayInputStream =
+ new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
+
+ DataInputStream inputStream = new DataInputStream(byteArrayInputStream);
+
+ byte typeByte = inputStream.readByte();
+ int cqNamesSize = InternalDataSerializer.readArrayLength(inputStream);
+
+ ClientUpdateMessageImpl.CqNameToOpHashMap reconstructedCqNameToOpHashMap =
+ new ClientUpdateMessageImpl.CqNameToOpHashMap(cqNamesSize);
+
+ for (int j = 0; j < cqNamesSize; j++) {
+ String cqNamesKey = DataSerializer.<String>readObject(inputStream);
+ Integer cqNamesValue = DataSerializer.<Integer>readObject(inputStream);
+ reconstructedCqNameToOpHashMap.add(cqNamesKey, cqNamesValue);
+ }
+
+ // The reconstructed map should have some subset of the entries in the cqNameToOpHashMap,
+ // but the specific contents will depend on timing with the removeFromHashMap task.
+ MapDifference<String, Integer> reconstructedVersusOriginalHashMapDifference =
+ Maps.difference(reconstructedCqNameToOpHashMap, originalCqNameToOpHashMap);
+ assertThat(reconstructedVersusOriginalHashMapDifference.entriesInCommon().size() >= 0)
+ .isTrue();
+ assertThat(reconstructedVersusOriginalHashMapDifference.entriesDiffering().size() == 0)
+ .isTrue();
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed to serialize/deserialize the CqNameToOpHashMap", ex);
+ }
+ });
+
+ CompletableFuture.allOf(removeFromHashMapTask, serializeReconstructHashMapTask).get(1,
+ TimeUnit.MINUTES);
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 64c180b..b6dbac0 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -1557,9 +1557,11 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
}
}
/**
- * Basically just a HashMap<String, Integer> but limits itself to the CqNameToOp interface.
+ * Basically just a ConcurrentHashMap<String, Integer> but limits itself to the CqNameToOp
+ * interface.
*/
- public static class CqNameToOpHashMap extends HashMap<String, Integer> implements CqNameToOp {
+ public static class CqNameToOpHashMap extends ConcurrentHashMap<String, Integer>
+ implements CqNameToOp {
public CqNameToOpHashMap(int initialCapacity) {
super(initialCapacity, 1.0f);
}
@@ -1573,7 +1575,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
public void sendTo(DataOutput out) throws IOException {
// When serialized it needs to look just as if writeObject was called on a HASH_MAP
out.writeByte(DSCODE.HASH_MAP.toByte());
- DataSerializer.writeHashMap(this, out);
+ DataSerializer.writeConcurrentHashMap(this, out);
}
@Override