You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2018/01/17 23:38:31 UTC
[geode] branch develop updated: GEODE-4285: Get a distributed lock
if we can't find a PDX type
This is an automated email from the ASF dual-hosted git repository.
upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new da42607 GEODE-4285: Get a distributed lock if we can't find a PDX type
da42607 is described below
commit da426077cc621196907df59c6a9e5f7dce907333
Author: Dan Smith <ds...@pivotal.io>
AuthorDate: Wed Jan 17 15:38:26 2018 -0800
GEODE-4285: Get a distributed lock if we can't find a PDX type
If we are unable to find a PDX type during a get, will we get a
distributed lock and try again. This prevents races where the type may
be in the middle of distribution.
Adding a dunit test for the race condition that requires this fix.
---
.../geode/pdx/internal/PeerTypeRegistration.java | 26 ++-
.../cache/wan/misc/PDXNewWanDUnitTest.java | 189 +++++++++++++++++++++
2 files changed, 206 insertions(+), 9 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
index 0752066..32e4c31 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
@@ -446,13 +446,27 @@ public class PeerTypeRegistration implements TypeRegistration {
}
public PdxType getType(int typeId) {
+ return getById(typeId);
+ }
+
+ private <T> T getById(Object typeId) {
verifyConfiguration();
TXStateProxy currentState = suspendTX();
try {
- return (PdxType) getIdToType().get(typeId);
+ T pdxType = (T) getIdToType().get(typeId);
+ if (pdxType == null) {
+ lock();
+ try {
+ pdxType = (T) getIdToType().get(typeId);
+ } finally {
+ unlock();
+ }
+ }
+ return pdxType;
} finally {
resumeTX(currentState);
}
+
}
public void addRemoteType(int typeId, PdxType type) {
@@ -466,7 +480,7 @@ public class PeerTypeRegistration implements TypeRegistration {
// the distributed lock.
lock();
try {
- r.put(typeId, type);
+ r.putIfAbsent(typeId, type);
} finally {
unlock();
}
@@ -703,14 +717,8 @@ public class PeerTypeRegistration implements TypeRegistration {
}
public EnumInfo getEnumById(int id) {
- verifyConfiguration();
EnumId enumId = new EnumId(id);
- TXStateProxy currentState = suspendTX();
- try {
- return (EnumInfo) getIdToType().get(enumId);
- } finally {
- resumeTX(currentState);
- }
+ return getById(enumId);
}
@Override
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
index abcd142..be027c9 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
@@ -16,14 +16,31 @@ package org.apache.geode.internal.cache.wan.misc;
import static org.junit.Assert.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.awaitility.Awaitility;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.internal.cache.UpdateOperation;
import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.pdx.PdxClientServerDUnitTest;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.pdx.internal.PeerTypeRegistration;
+import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
@@ -35,6 +52,7 @@ import org.apache.geode.test.junit.categories.FlakyTest;
public class PDXNewWanDUnitTest extends WANTestBase {
private static final long serialVersionUID = 1L;
+ public static final String KEY_0 = "Key_0";
public PDXNewWanDUnitTest() {
super();
@@ -497,6 +515,115 @@ public class PDXNewWanDUnitTest extends WANTestBase {
}
@Test
+ public void testWANPDX_PR_ParallelSender_WithDelayedTypeRegistry()
+ throws InterruptedException, ExecutionException {
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ // Create the receiver side of the WAN gateway. Only vm2 will be a receiver, vm3 is
+ // just a peer
+ createCacheInVMs(nyPort, vm2, vm3);
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 0, 4,
+ isOffHeap()));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 0, 4,
+ isOffHeap()));
+
+ AsyncInvocation deserializationFuture;
+ try {
+ // Delay processing of sending type registry update from vm2
+ vm2.invoke(() -> {
+ DistributionMessageObserver.setInstance(new BlockingPdxTypeUpdateObserver());
+ });
+
+ // Create the sender side of the WAN connection. 2 VMs, with paused senders
+ vm4.invoke(() -> WANTestBase.createCache(lnPort));
+ vm5.invoke(() -> WANTestBase.createCache(lnPort));
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false));
+
+ // Create the partitioned region in vm4
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 0, 4,
+ isOffHeap()));
+
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 0, 4,
+ isOffHeap()));
+
+ vm5.invoke(() -> {
+ Region region = cache.getRegion(getTestMethodName() + "_PR");
+ PartitionRegionHelper.assignBucketsToPartitions(region);
+ });
+
+ vm4.invoke(() -> WANTestBase.pauseSender("ln"));
+ vm5.invoke(() -> WANTestBase.pauseSender("ln"));
+
+ // Do some puts to fill up our queues
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 20));
+
+ vm4.invoke(() -> {
+ final Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
+ PdxValue result = (PdxValue) r.put(KEY_0, new PdxValue(0));
+ });
+
+ // Force VM4 to be the primary
+ vm4.invoke(() -> {
+ final Region region = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
+ DistributedMember primary = PartitionRegionHelper.getPrimaryMemberForKey(region, KEY_0);
+ // If we are not the primary
+ DistributedMember localMember = cache.getDistributedSystem().getDistributedMember();
+ if (!primary.equals(localMember)) {
+ PartitionRegionHelper.moveBucketByKey(region, primary, localMember, KEY_0);
+
+ }
+ });
+
+ vm5.invoke(() -> WANTestBase.resumeSender("ln"));
+
+ boolean blocking = vm2.invoke(() -> {
+ BlockingPdxTypeUpdateObserver observer =
+ (BlockingPdxTypeUpdateObserver) DistributionMessageObserver.getInstance();
+ return observer.startedBlocking.await(1, TimeUnit.MINUTES);
+ });
+
+ assertTrue(blocking);
+
+ vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+
+ vm2.invoke(() -> {
+ final Region region = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
+ Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> region.containsKey(KEY_0));
+
+ });
+
+ // Make sure vm3 can deserialize the value
+ deserializationFuture = vm3.invokeAsync(() -> {
+ final Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
+ PdxValue result = (PdxValue) r.get(KEY_0);
+ assertEquals(result, new PdxValue(0));
+ });
+
+ try {
+ deserializationFuture.await(10, TimeUnit.SECONDS);
+ fail("Get should have been blocked waiting for PDX type to be distributed");
+ } catch (TimeoutException e) {
+ // This is what we hope will happen. The get will be blocked by some sort of lock, rather
+ // than failing due to a missing type.
+ }
+
+ } finally {
+
+ vm2.invoke(() -> {
+ BlockingPdxTypeUpdateObserver observer =
+ (BlockingPdxTypeUpdateObserver) DistributionMessageObserver.getInstance();
+ observer.latch.countDown();
+ });
+ }
+
+ deserializationFuture.get();
+ }
+
+ @Test
public void testWANPDX_PR_ParallelSender_47826() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
@@ -724,5 +851,67 @@ public class PDXNewWanDUnitTest extends WANTestBase {
}
+ private static class BlockingPdxTypeUpdateObserver extends DistributionMessageObserver {
+ private CountDownLatch latch = new CountDownLatch(1);
+ private CountDownLatch startedBlocking = new CountDownLatch(1);
+
+ @Override
+ public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) {
+ if (message instanceof UpdateOperation.UpdateMessage
+ && ((UpdateOperation.UpdateMessage) message).getRegionPath()
+ .contains(PeerTypeRegistration.REGION_FULL_PATH)) {
+ startedBlocking.countDown();
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+
+ }
+ }
+ }
+
+ public static class PdxValue implements PdxSerializable {
+ public int value;
+
+ public PdxValue() {
+
+ }
+
+ public PdxValue(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public void toData(PdxWriter writer) {
+ writer.writeInt("value", value);
+
+ }
+
+ @Override
+ public void fromData(PdxReader reader) {
+ value = reader.readInt("value");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ PdxValue pdxValue = (PdxValue) o;
+
+ return value == pdxValue.value;
+ }
+
+ @Override
+ public int hashCode() {
+ return value;
+ }
+ }
+
}
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].