You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2017/08/02 00:43:22 UTC

geode git commit: GEODE-3315: Replaced PreferBytes... with VMCachedDeserializable

Repository: geode
Updated Branches:
  refs/heads/develop ab90d406b -> 56ea940d3


GEODE-3315: Replaced PreferBytes... with VMCachedDeserializable

When getting a HAEventWrapper as part of a GII, make sure that we store
the wrapper in a VMCachedDeserializable. This object needs to have a
reference to the HAContainer. If PREFER_SERIALIZED is set to true, we
we using a PreferBytesSerializable which would always create new copy of
the HAEventWrapper.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/56ea940d
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/56ea940d
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/56ea940d

Branch: refs/heads/develop
Commit: 56ea940d3c826e98b16d6b508fc834f7bd50220c
Parents: ab90d40
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Thu Jun 22 13:52:24 2017 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Tue Aug 1 15:30:43 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/AbstractRegionMap.java |  12 +-
 .../ha/PreferSerializedHARegionQueueTest.java   | 184 +++++++++++++++++++
 2 files changed, 189 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/56ea940d/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index f958f94..fd5a430 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -771,12 +771,11 @@ public abstract class AbstractRegionMap implements RegionMap {
 
     if (owner instanceof HARegion && newValue instanceof CachedDeserializable) {
       Object actualVal = null;
+      CachedDeserializable newValueCd = (CachedDeserializable) newValue;
       try {
-        actualVal =
-            BlobHelper.deserializeBlob(((CachedDeserializable) newValue).getSerializedValue(),
-                sender.getVersionObject(), null);
-        newValue = CachedDeserializableFactory.create(actualVal,
-            ((CachedDeserializable) newValue).getValueSizeInBytes());
+        actualVal = BlobHelper.deserializeBlob(newValueCd.getSerializedValue(),
+            sender.getVersionObject(), null);
+        newValue = new VMCachedDeserializable(actualVal, newValueCd.getSizeInBytes());
       } catch (IOException | ClassNotFoundException e) {
         throw new RuntimeException("Unable to deserialize HA event for region " + owner);
       }
@@ -809,8 +808,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                 HARegionQueue.addClientCQsAndInterestList(oldMsg, haEventWrapper, haContainer,
                     owner.getName());
                 haEventWrapper.setClientUpdateMessage(null);
-                newValue = CachedDeserializableFactory.create(original,
-                    ((CachedDeserializable) newValue).getSizeInBytes());
+                newValue = new VMCachedDeserializable(original, newValueCd.getSizeInBytes());
               } else {
                 original = null;
               }

http://git-wip-us.apache.org/repos/asf/geode/blob/56ea940d/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java
new file mode 100644
index 0000000..59aa79a
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
+import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
+import org.apache.geode.test.dunit.DUnitEnv;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+@Category(DistributedTest.class)
+public class PreferSerializedHARegionQueueTest extends JUnit4CacheTestCase {
+
+  private static final long serialVersionUID = 1L;
+
+  @Test
+  public void copyingHARegionQueueShouldNotThrowException() throws Exception {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+    VM vm4 = host.getVM(4);
+    VM vm5 = host.getVM(5);
+    VM vm6 = host.getVM(6);
+
+    // Set prefer serialized
+    vm1.invoke(() -> setPreferSerialized());
+    vm2.invoke(() -> setPreferSerialized());
+    vm3.invoke(() -> setPreferSerialized());
+    vm4.invoke(() -> setPreferSerialized());
+
+    String regionName = getTestMethodName() + "_PR";
+    try {
+      // Initialize initial cache servers
+      vm1.invoke(() -> initializeServer(regionName));
+      vm2.invoke(() -> initializeServer(regionName));
+
+      // Create register interest client
+      vm5.invoke(() -> createClient(regionName, true, 1, Integer.MAX_VALUE));
+
+      // Wait for both primary and secondary servers to establish proxies
+      vm1.invoke(() -> waitForCacheClientProxies(1));
+      vm2.invoke(() -> waitForCacheClientProxies(1));
+
+      // Create client loader and load entries
+      int numPuts = 10;
+      vm6.invoke(
+          () -> createClient(regionName, false, 0, PoolFactory.DEFAULT_SUBSCRIPTION_ACK_INTERVAL));
+      vm6.invoke(() -> {
+        Region region = getCache().getRegion(regionName);
+        IntStream.range(0, numPuts).forEach(i -> region.put(i, i));
+      });
+
+      // Verify HARegion sizes
+      vm1.invoke(() -> waitForHARegionSize(numPuts));
+      vm2.invoke(() -> waitForHARegionSize(numPuts));
+
+      // Initialize next cache server
+      vm3.invoke(() -> initializeServer(regionName));
+
+      // Stop one of the original cache servers
+      vm1.invoke(() -> closeCache());
+
+      // Wait for new cache server to establish proxies
+      vm3.invoke(() -> waitForCacheClientProxies(1));
+
+      // Verify HARegion size
+      vm3.invoke(() -> waitForHARegionSize(numPuts));
+
+      // Initialize final cache server
+      vm4.invoke(() -> initializeServer(regionName));
+
+      // Stop other original cache server
+      vm2.invoke(() -> closeCache());
+
+      // Wait for new cache server to establish proxies
+      vm4.invoke(() -> waitForCacheClientProxies(1));
+
+      // Verify HARegion size
+      vm4.invoke(() -> waitForHARegionSize(numPuts));
+
+      // Stop the clients to prevent suspect strings when the servers are stopped
+      vm5.invoke(() -> closeCache());
+      vm6.invoke(() -> closeCache());
+    } finally {
+      // Clear prefer serialized
+      vm1.invoke(() -> clearPreferSerialized());
+      vm2.invoke(() -> clearPreferSerialized());
+      vm3.invoke(() -> clearPreferSerialized());
+      vm4.invoke(() -> clearPreferSerialized());
+    }
+  }
+
+  public void initializeServer(String regionName) throws IOException {
+    getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);
+
+    final CacheServer cacheServer = getCache().addCacheServer();
+    cacheServer.setPort(0);
+    cacheServer.start();
+  }
+
+  public void createClient(String regionName, boolean subscriptionEnabled,
+      int subscriptionRedundancy, int subscriptionAckInterval) {
+
+    ClientCacheFactory clientCacheFactory =
+        new ClientCacheFactory().setPoolSubscriptionAckInterval(subscriptionAckInterval)
+            .setPoolSubscriptionEnabled(subscriptionEnabled)
+            .setPoolSubscriptionRedundancy(subscriptionRedundancy)
+            .addPoolLocator("localhost", DUnitEnv.get().getLocatorPort());
+
+    ClientCache cache = getClientCache(clientCacheFactory);
+
+    Region region = cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
+    if (subscriptionEnabled) {
+      region.registerInterest("ALL_KEYS");
+    }
+  }
+
+  public static void setPreferSerialized() {
+    System.setProperty("gemfire.PREFER_SERIALIZED", "true");
+  }
+
+  public static void clearPreferSerialized() {
+    System.clearProperty("gemfire.PREFER_SERIALIZED");
+  }
+
+  public void waitForCacheClientProxies(final int expectedSize) {
+    final CacheServer cs = getCache().getCacheServers().iterator().next();
+    Awaitility.await().atMost(1, TimeUnit.MINUTES)
+        .until(() -> assertEquals(expectedSize, cs.getAllClientSessions().size()));
+  }
+
+  public void waitForHARegionSize(final int expectedSize) {
+    final CacheServer cs = getCache().getCacheServers().iterator().next();
+    final CacheClientProxy ccp = (CacheClientProxy) cs.getAllClientSessions().iterator().next();
+    Awaitility.await().atMost(1, TimeUnit.MINUTES)
+        .until(() -> assertEquals(expectedSize, getHAEventsCount(ccp)));
+  }
+
+  private static int getHAEventsCount(CacheClientProxy ccp) {
+    Region haRegion = ccp.getHARegion();
+    if (haRegion == null) {
+      return 0;
+    }
+    int count = 0;
+    for (Object value : haRegion.values()) {
+      if (value instanceof HAEventWrapper) {
+        count += 1;
+      }
+    }
+    return count;
+  }
+}