You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2017/08/02 00:43:22 UTC
geode git commit: GEODE-3315: Replaced PreferBytes... with
VMCachedDeserializable
Repository: geode
Updated Branches:
refs/heads/develop ab90d406b -> 56ea940d3
GEODE-3315: Replaced PreferBytes... with VMCachedDeserializable
When getting a HAEventWrapper as part of a GII, make sure that we store
the wrapper in a VMCachedDeserializable. This object needs to have a
reference to the HAContainer. If PREFER_SERIALIZED is set to true, we
we using a PreferBytesSerializable which would always create new copy of
the HAEventWrapper.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/56ea940d
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/56ea940d
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/56ea940d
Branch: refs/heads/develop
Commit: 56ea940d3c826e98b16d6b508fc834f7bd50220c
Parents: ab90d40
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Thu Jun 22 13:52:24 2017 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Tue Aug 1 15:30:43 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/AbstractRegionMap.java | 12 +-
.../ha/PreferSerializedHARegionQueueTest.java | 184 +++++++++++++++++++
2 files changed, 189 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/56ea940d/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index f958f94..fd5a430 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -771,12 +771,11 @@ public abstract class AbstractRegionMap implements RegionMap {
if (owner instanceof HARegion && newValue instanceof CachedDeserializable) {
Object actualVal = null;
+ CachedDeserializable newValueCd = (CachedDeserializable) newValue;
try {
- actualVal =
- BlobHelper.deserializeBlob(((CachedDeserializable) newValue).getSerializedValue(),
- sender.getVersionObject(), null);
- newValue = CachedDeserializableFactory.create(actualVal,
- ((CachedDeserializable) newValue).getValueSizeInBytes());
+ actualVal = BlobHelper.deserializeBlob(newValueCd.getSerializedValue(),
+ sender.getVersionObject(), null);
+ newValue = new VMCachedDeserializable(actualVal, newValueCd.getSizeInBytes());
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Unable to deserialize HA event for region " + owner);
}
@@ -809,8 +808,7 @@ public abstract class AbstractRegionMap implements RegionMap {
HARegionQueue.addClientCQsAndInterestList(oldMsg, haEventWrapper, haContainer,
owner.getName());
haEventWrapper.setClientUpdateMessage(null);
- newValue = CachedDeserializableFactory.create(original,
- ((CachedDeserializable) newValue).getSizeInBytes());
+ newValue = new VMCachedDeserializable(original, newValueCd.getSizeInBytes());
} else {
original = null;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/56ea940d/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java
new file mode 100644
index 0000000..59aa79a
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
+import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
+import org.apache.geode.test.dunit.DUnitEnv;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+@Category(DistributedTest.class)
+public class PreferSerializedHARegionQueueTest extends JUnit4CacheTestCase {
+
+ private static final long serialVersionUID = 1L;
+
+ @Test
+ public void copyingHARegionQueueShouldNotThrowException() throws Exception {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+ VM vm4 = host.getVM(4);
+ VM vm5 = host.getVM(5);
+ VM vm6 = host.getVM(6);
+
+ // Set prefer serialized
+ vm1.invoke(() -> setPreferSerialized());
+ vm2.invoke(() -> setPreferSerialized());
+ vm3.invoke(() -> setPreferSerialized());
+ vm4.invoke(() -> setPreferSerialized());
+
+ String regionName = getTestMethodName() + "_PR";
+ try {
+ // Initialize initial cache servers
+ vm1.invoke(() -> initializeServer(regionName));
+ vm2.invoke(() -> initializeServer(regionName));
+
+ // Create register interest client
+ vm5.invoke(() -> createClient(regionName, true, 1, Integer.MAX_VALUE));
+
+ // Wait for both primary and secondary servers to establish proxies
+ vm1.invoke(() -> waitForCacheClientProxies(1));
+ vm2.invoke(() -> waitForCacheClientProxies(1));
+
+ // Create client loader and load entries
+ int numPuts = 10;
+ vm6.invoke(
+ () -> createClient(regionName, false, 0, PoolFactory.DEFAULT_SUBSCRIPTION_ACK_INTERVAL));
+ vm6.invoke(() -> {
+ Region region = getCache().getRegion(regionName);
+ IntStream.range(0, numPuts).forEach(i -> region.put(i, i));
+ });
+
+ // Verify HARegion sizes
+ vm1.invoke(() -> waitForHARegionSize(numPuts));
+ vm2.invoke(() -> waitForHARegionSize(numPuts));
+
+ // Initialize next cache server
+ vm3.invoke(() -> initializeServer(regionName));
+
+ // Stop one of the original cache servers
+ vm1.invoke(() -> closeCache());
+
+ // Wait for new cache server to establish proxies
+ vm3.invoke(() -> waitForCacheClientProxies(1));
+
+ // Verify HARegion size
+ vm3.invoke(() -> waitForHARegionSize(numPuts));
+
+ // Initialize final cache server
+ vm4.invoke(() -> initializeServer(regionName));
+
+ // Stop other original cache server
+ vm2.invoke(() -> closeCache());
+
+ // Wait for new cache server to establish proxies
+ vm4.invoke(() -> waitForCacheClientProxies(1));
+
+ // Verify HARegion size
+ vm4.invoke(() -> waitForHARegionSize(numPuts));
+
+ // Stop the clients to prevent suspect strings when the servers are stopped
+ vm5.invoke(() -> closeCache());
+ vm6.invoke(() -> closeCache());
+ } finally {
+ // Clear prefer serialized
+ vm1.invoke(() -> clearPreferSerialized());
+ vm2.invoke(() -> clearPreferSerialized());
+ vm3.invoke(() -> clearPreferSerialized());
+ vm4.invoke(() -> clearPreferSerialized());
+ }
+ }
+
+ public void initializeServer(String regionName) throws IOException {
+ getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);
+
+ final CacheServer cacheServer = getCache().addCacheServer();
+ cacheServer.setPort(0);
+ cacheServer.start();
+ }
+
+ public void createClient(String regionName, boolean subscriptionEnabled,
+ int subscriptionRedundancy, int subscriptionAckInterval) {
+
+ ClientCacheFactory clientCacheFactory =
+ new ClientCacheFactory().setPoolSubscriptionAckInterval(subscriptionAckInterval)
+ .setPoolSubscriptionEnabled(subscriptionEnabled)
+ .setPoolSubscriptionRedundancy(subscriptionRedundancy)
+ .addPoolLocator("localhost", DUnitEnv.get().getLocatorPort());
+
+ ClientCache cache = getClientCache(clientCacheFactory);
+
+ Region region = cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
+ if (subscriptionEnabled) {
+ region.registerInterest("ALL_KEYS");
+ }
+ }
+
+ public static void setPreferSerialized() {
+ System.setProperty("gemfire.PREFER_SERIALIZED", "true");
+ }
+
+ public static void clearPreferSerialized() {
+ System.clearProperty("gemfire.PREFER_SERIALIZED");
+ }
+
+ public void waitForCacheClientProxies(final int expectedSize) {
+ final CacheServer cs = getCache().getCacheServers().iterator().next();
+ Awaitility.await().atMost(1, TimeUnit.MINUTES)
+ .until(() -> assertEquals(expectedSize, cs.getAllClientSessions().size()));
+ }
+
+ public void waitForHARegionSize(final int expectedSize) {
+ final CacheServer cs = getCache().getCacheServers().iterator().next();
+ final CacheClientProxy ccp = (CacheClientProxy) cs.getAllClientSessions().iterator().next();
+ Awaitility.await().atMost(1, TimeUnit.MINUTES)
+ .until(() -> assertEquals(expectedSize, getHAEventsCount(ccp)));
+ }
+
+ private static int getHAEventsCount(CacheClientProxy ccp) {
+ Region haRegion = ccp.getHARegion();
+ if (haRegion == null) {
+ return 0;
+ }
+ int count = 0;
+ for (Object value : haRegion.values()) {
+ if (value instanceof HAEventWrapper) {
+ count += 1;
+ }
+ }
+ return count;
+ }
+}