You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2018/01/17 23:38:31 UTC

[geode] branch develop updated: GEODE-4285: Get a distributed lock if we can't find a PDX type

This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new da42607  GEODE-4285: Get a distributed lock if we can't find a PDX type
da42607 is described below

commit da426077cc621196907df59c6a9e5f7dce907333
Author: Dan Smith <ds...@pivotal.io>
AuthorDate: Wed Jan 17 15:38:26 2018 -0800

    GEODE-4285: Get a distributed lock if we can't find a PDX type
    
    If we are unable to find a PDX type during a get, will we get a
    distributed lock and try again. This prevents races where the type may
    be in the middle of distribution.
    
    Adding a dunit test for the race condition that requires this fix.
---
 .../geode/pdx/internal/PeerTypeRegistration.java   |  26 ++-
 .../cache/wan/misc/PDXNewWanDUnitTest.java         | 189 +++++++++++++++++++++
 2 files changed, 206 insertions(+), 9 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
index 0752066..32e4c31 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
@@ -446,13 +446,27 @@ public class PeerTypeRegistration implements TypeRegistration {
   }
 
   public PdxType getType(int typeId) {
+    return getById(typeId);
+  }
+
+  private <T> T getById(Object typeId) {
     verifyConfiguration();
     TXStateProxy currentState = suspendTX();
     try {
-      return (PdxType) getIdToType().get(typeId);
+      T pdxType = (T) getIdToType().get(typeId);
+      if (pdxType == null) {
+        lock();
+        try {
+          pdxType = (T) getIdToType().get(typeId);
+        } finally {
+          unlock();
+        }
+      }
+      return pdxType;
     } finally {
       resumeTX(currentState);
     }
+
   }
 
   public void addRemoteType(int typeId, PdxType type) {
@@ -466,7 +480,7 @@ public class PeerTypeRegistration implements TypeRegistration {
         // the distributed lock.
         lock();
         try {
-          r.put(typeId, type);
+          r.putIfAbsent(typeId, type);
         } finally {
           unlock();
         }
@@ -703,14 +717,8 @@ public class PeerTypeRegistration implements TypeRegistration {
   }
 
   public EnumInfo getEnumById(int id) {
-    verifyConfiguration();
     EnumId enumId = new EnumId(id);
-    TXStateProxy currentState = suspendTX();
-    try {
-      return (EnumInfo) getIdToType().get(enumId);
-    } finally {
-      resumeTX(currentState);
-    }
+    return getById(enumId);
   }
 
   @Override
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
index abcd142..be027c9 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
@@ -16,14 +16,31 @@ package org.apache.geode.internal.cache.wan.misc;
 
 import static org.junit.Assert.*;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.awaitility.Awaitility;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.internal.cache.UpdateOperation;
 import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.pdx.PdxClientServerDUnitTest;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.pdx.internal.PeerTypeRegistration;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.Wait;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
@@ -35,6 +52,7 @@ import org.apache.geode.test.junit.categories.FlakyTest;
 public class PDXNewWanDUnitTest extends WANTestBase {
 
   private static final long serialVersionUID = 1L;
+  public static final String KEY_0 = "Key_0";
 
   public PDXNewWanDUnitTest() {
     super();
@@ -497,6 +515,115 @@ public class PDXNewWanDUnitTest extends WANTestBase {
   }
 
   @Test
+  public void testWANPDX_PR_ParallelSender_WithDelayedTypeRegistry()
+      throws InterruptedException, ExecutionException {
+    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    // Create the receiver side of the WAN gateway. Only vm2 will be a receiver, vm3 is
+    // just a peer
+    createCacheInVMs(nyPort, vm2, vm3);
+    vm2.invoke(() -> WANTestBase.createReceiver());
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 0, 4,
+        isOffHeap()));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 0, 4,
+        isOffHeap()));
+
+    AsyncInvocation deserializationFuture;
+    try {
+      // Delay processing of sending type registry update from vm2
+      vm2.invoke(() -> {
+        DistributionMessageObserver.setInstance(new BlockingPdxTypeUpdateObserver());
+      });
+
+      // Create the sender side of the WAN connection. 2 VMs, with paused senders
+      vm4.invoke(() -> WANTestBase.createCache(lnPort));
+      vm5.invoke(() -> WANTestBase.createCache(lnPort));
+
+      vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false));
+      vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false));
+
+      // Create the partitioned region in vm4
+      vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 0, 4,
+          isOffHeap()));
+
+      vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 0, 4,
+          isOffHeap()));
+
+      vm5.invoke(() -> {
+        Region region = cache.getRegion(getTestMethodName() + "_PR");
+        PartitionRegionHelper.assignBucketsToPartitions(region);
+      });
+
+      vm4.invoke(() -> WANTestBase.pauseSender("ln"));
+      vm5.invoke(() -> WANTestBase.pauseSender("ln"));
+
+      // Do some puts to fill up our queues
+      vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 20));
+
+      vm4.invoke(() -> {
+        final Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
+        PdxValue result = (PdxValue) r.put(KEY_0, new PdxValue(0));
+      });
+
+      // Force VM4 to be the primary
+      vm4.invoke(() -> {
+        final Region region = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
+        DistributedMember primary = PartitionRegionHelper.getPrimaryMemberForKey(region, KEY_0);
+        // If we are not the primary
+        DistributedMember localMember = cache.getDistributedSystem().getDistributedMember();
+        if (!primary.equals(localMember)) {
+          PartitionRegionHelper.moveBucketByKey(region, primary, localMember, KEY_0);
+
+        }
+      });
+
+      vm5.invoke(() -> WANTestBase.resumeSender("ln"));
+
+      boolean blocking = vm2.invoke(() -> {
+        BlockingPdxTypeUpdateObserver observer =
+            (BlockingPdxTypeUpdateObserver) DistributionMessageObserver.getInstance();
+        return observer.startedBlocking.await(1, TimeUnit.MINUTES);
+      });
+
+      assertTrue(blocking);
+
+      vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+
+      vm2.invoke(() -> {
+        final Region region = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
+        Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> region.containsKey(KEY_0));
+
+      });
+
+      // Make sure vm3 can deserialize the value
+      deserializationFuture = vm3.invokeAsync(() -> {
+        final Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
+        PdxValue result = (PdxValue) r.get(KEY_0);
+        assertEquals(result, new PdxValue(0));
+      });
+
+      try {
+        deserializationFuture.await(10, TimeUnit.SECONDS);
+        fail("Get should have been blocked waiting for PDX type to be distributed");
+      } catch (TimeoutException e) {
+        // This is what we hope will happen. The get will be blocked by some sort of lock, rather
+        // than failing due to a missing type.
+      }
+
+    } finally {
+
+      vm2.invoke(() -> {
+        BlockingPdxTypeUpdateObserver observer =
+            (BlockingPdxTypeUpdateObserver) DistributionMessageObserver.getInstance();
+        observer.latch.countDown();
+      });
+    }
+
+    deserializationFuture.get();
+  }
+
+  @Test
   public void testWANPDX_PR_ParallelSender_47826() {
     Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
@@ -724,5 +851,67 @@ public class PDXNewWanDUnitTest extends WANTestBase {
   }
 
 
+  private static class BlockingPdxTypeUpdateObserver extends DistributionMessageObserver {
+    private CountDownLatch latch = new CountDownLatch(1);
+    private CountDownLatch startedBlocking = new CountDownLatch(1);
+
+    @Override
+    public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) {
+      if (message instanceof UpdateOperation.UpdateMessage
+          && ((UpdateOperation.UpdateMessage) message).getRegionPath()
+              .contains(PeerTypeRegistration.REGION_FULL_PATH)) {
+        startedBlocking.countDown();
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
+          throw new RuntimeException("Interrupted", e);
+        }
+
+      }
+    }
+  }
+
+  public static class PdxValue implements PdxSerializable {
+    public int value;
+
+    public PdxValue() {
+
+    }
+
+    public PdxValue(int value) {
+      this.value = value;
+    }
+
+    @Override
+    public void toData(PdxWriter writer) {
+      writer.writeInt("value", value);
+
+    }
+
+    @Override
+    public void fromData(PdxReader reader) {
+      value = reader.readInt("value");
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      PdxValue pdxValue = (PdxValue) o;
+
+      return value == pdxValue.value;
+    }
+
+    @Override
+    public int hashCode() {
+      return value;
+    }
+  }
+
 
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].