You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2019/06/28 07:09:39 UTC

[geode] 01/01: GEODE-6908: retried REMOVE should not create new version tag. added retry dunit tests for all the c/s operations.

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-6908
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c57d15803dd9c9146af92a027724eaf777aa22d4
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Fri Jun 28 00:06:11 2019 -0700

    GEODE-6908: retried REMOVE should not create new version tag.
                added retry dunit tests for all the c/s operations.
---
 .../pdx/ClientsWithVersioningRetryDUnitTest.java   | 285 +++++++++++++++++----
 .../apache/geode/internal/cache/LocalRegion.java   |   9 +-
 2 files changed, 238 insertions(+), 56 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java
index e992d21..5beab36 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java
@@ -49,6 +49,7 @@ import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.DistributedCacheOperation;
 import org.apache.geode.internal.cache.DistributedPutAllOperation;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
@@ -117,7 +118,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     final VM vm0 = host.getVM(0);
     final VM vm1 = host.getVM(1);
 
-
     createServerRegion(vm0, RegionShortcut.REPLICATE);
     createServerRegion(vm1, RegionShortcut.REPLICATE);
 
@@ -160,7 +160,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
           event.setContext(new ClientProxyMembershipID(memberID));
           boolean recovered =
               ((BaseCommand) Put70.getCommand()).recoverVersionTagForRetriedOperation(event);
-          assertTrue("Expected to recover the version for this event ID", recovered);
+          assertTrue("Expected to recover the version for this event ID",
+              recovered);
           assertEquals("Expected the region version to be 123", 123,
               event.getVersionTag().getRegionVersion());
         } finally {
@@ -173,10 +174,12 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     vm1.invoke(new SerializableRunnable("recover posdup event tag in vm1 event tracker from vm0") {
       @Override
       public void run() {
-        DistributedRegion dr = (DistributedRegion) getCache().getRegion("region");
+        DistributedRegion dr = (DistributedRegion) getCache()
+            .getRegion("region");
         EventID eventID = new EventID(new byte[0], 1, 0);
-        EntryEventImpl event = EntryEventImpl.create(dr, Operation.CREATE, "TestObject",
-            "TestValue", null, false, memberID, true, eventID);
+        EntryEventImpl event = EntryEventImpl
+            .create(dr, Operation.CREATE, "TestObject",
+                "TestValue", null, false, memberID, true, eventID);
         event.setPossibleDuplicate(true);
         try {
           dr.hasSeenEvent(event);
@@ -189,6 +192,162 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     });
   }
 
+  enum OpType {
+    CREATE,
+    PUT,
+    DESTROY,
+    INVALIDATE,
+    PUT_IF_ABSENT,
+    REPLACE,
+    REPLACE_WITH_OLDVALUE,
+    REMOVE
+  };
+
+  @Test
+  public void testRetriedCreate() {
+    doRetriedTest(OpType.CREATE);
+  }
+
+  @Test
+  public void testRetriedPut() {
+    doRetriedTest(OpType.PUT);
+  }
+
+  @Test
+  public void testRetriedDestroy() {
+    doRetriedTest(OpType.DESTROY);
+  }
+
+  @Test
+  public void testRetriedInvalidate() {
+    doRetriedTest(OpType.INVALIDATE);
+  }
+
+  @Test
+  public void testRetriedPutIfAbsent() {
+    doRetriedTest(OpType.PUT_IF_ABSENT);
+  }
+
+  @Test
+  public void testRetriedReplace() {
+    doRetriedTest(OpType.REPLACE);
+  }
+
+  @Test
+  public void testRetriedReplaceWithOldValue() {
+    doRetriedTest(OpType.REPLACE_WITH_OLDVALUE);
+  }
+
+  @Test
+  public void testRetriedRemove() {
+    doRetriedTest(OpType.REMOVE);
+  }
+
+  private void doRetriedTest(final OpType opType) {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm3 = host.getVM(3);
+
+    int port0 = createServerRegion(vm0, RegionShortcut.REPLICATE);
+    int port1 = createServerRegion(vm1, RegionShortcut.REPLICATE);
+    createClientRegion(vm3, port0, port1);
+
+    vm0.invoke(new SerializableRunnable() {
+
+      @Override
+      public void run() {
+        Region region = getCache().getRegion("region");
+        if (opType != OpType.CREATE && opType != OpType.PUT_IF_ABSENT) {
+          region.put(0, "value");
+        }
+
+        // Add a listener to close vm0 when we send a distributed operation
+        // this will cause a retry after we have applied the original put all to
+        // the cache, causing a retry
+        DistributionMessageObserver
+            .setInstance(new DistributionMessageObserver() {
+
+              @Override
+              public void beforeSendMessage(ClusterDistributionManager dm,
+                  DistributionMessage message) {
+                if (message instanceof DistributedCacheOperation.CacheOperationMessage) {
+                  DistributedCacheOperation.CacheOperationMessage com =
+                      (DistributedCacheOperation.CacheOperationMessage) message;
+                  VersionTag tag = com.getVersionTag();
+                  if (((opType == OpType.CREATE || opType == OpType.PUT_IF_ABSENT)
+                      && tag.getEntryVersion() == 1) || tag.getEntryVersion() == 2) {
+                    DistributionMessageObserver.setInstance(null);
+                    disconnectFromDS(vm0);
+                  }
+                }
+              }
+            });
+
+      }
+    });
+
+    // this put operation will trigger vm1 to be closed, and the put will be retried
+    vm3.invoke(new SerializableCallable("perform update in client") {
+      @Override
+      public Object call() throws Exception {
+        Region region = getCache().getRegion("region");
+        switch (opType) {
+          case CREATE:
+            region.create(0, "newvalue");
+            break;
+          case PUT:
+            region.put(0, "newvalue");
+            break;
+          case DESTROY:
+            region.destroy(0);
+            break;
+          case INVALIDATE:
+            region.invalidate(0);
+            break;
+          case PUT_IF_ABSENT:
+            region.putIfAbsent(0, "newvalue");
+            break;
+          case REPLACE:
+            region.replace(0, "newvalue");
+            break;
+          case REPLACE_WITH_OLDVALUE:
+            region.replace(0, "value", "newvalue");
+            break;
+          case REMOVE:
+            region.remove(0, "value");
+            break;
+        }
+        return null;
+      }
+    });
+
+    // Verify the observer was triggered
+    vm1.invoke(new SerializableRunnable() {
+
+      @Override
+      public void run() {
+        // if the observer was triggered, it would have cleared itself
+        assertNull(DistributionMessageObserver.getInstance());
+        VersionTag tag = ((LocalRegion) getCache().getRegion("region"))
+            .getVersionTag(0);
+        if (opType == OpType.CREATE || opType == OpType.PUT_IF_ABSENT) {
+          assertEquals(1, tag.getRegionVersion());
+        } else {
+          assertEquals(2, tag.getRegionVersion());
+        }
+      }
+    });
+
+    // Make sure vm0 did in fact shut down
+    vm0.invoke(new SerializableRunnable() {
+      @Override
+      public void run() {
+        GemFireCacheImpl cache = (GemFireCacheImpl) basicGetCache();
+        assertTrue(cache == null || cache.isClosed());
+      }
+    });
+  }
 
   /**
    * Test that we can successfully retry a distributed put all and get the version information. bug
@@ -202,7 +361,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     final VM vm2 = host.getVM(2);
     final VM vm3 = host.getVM(3);
 
-
     createServerRegion(vm0, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
     vm0.invoke(new SerializableRunnable() {
 
@@ -215,27 +373,28 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
         // Add a listener to close vm1 when we send a distributed put all operation
         // this will cause a retry after we have applied the original put all to
         // the cache, causing a retry
-        DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
-
-          @Override
-          public void beforeSendMessage(ClusterDistributionManager dm,
-              DistributionMessage message) {
-            if (message instanceof DistributedPutAllOperation.PutAllMessage) {
-              DistributionMessageObserver.setInstance(null);
-              disconnectFromDS(vm1);
-            }
-          }
-        });
+        DistributionMessageObserver
+            .setInstance(new DistributionMessageObserver() {
+
+              @Override
+              public void beforeSendMessage(ClusterDistributionManager dm,
+                  DistributionMessage message) {
+                if (message instanceof DistributedPutAllOperation.PutAllMessage) {
+                  DistributionMessageObserver.setInstance(null);
+                  disconnectFromDS(vm1);
+                }
+              }
+            });
 
       }
     });
 
-    int port1 = createServerRegion(vm1, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
-    int port2 = createServerRegion(vm2, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
+    int port1 = createServerRegion(vm1,
+        RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
+    int port2 = createServerRegion(vm2,
+        RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
     createClientRegion(vm3, port1, port2);
 
-
-
     // This will be a put all to bucket 0
     // Here's the expected sequence
     // client->vm1 (accessor0)
@@ -310,31 +469,37 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     LogWriterUtils.getLogWriter().info("creating region in vm3");
     createRegionInPeer(vm3, RegionShortcut.PARTITION_PROXY);
 
-    expectedExceptions.add(IgnoredException.addIgnoredException("RuntimeException", vm2));
-    vm2.invoke(new SerializableRunnable("install message listener to ignore update") {
-      @Override
-      public void run() {
-        // Add a listener to close vm2 when we send a distributed put all operation
-        // this will cause a retry after we have applied the original put all to
-        // the cache, causing a retry
-        DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
-
+    expectedExceptions
+        .add(IgnoredException.addIgnoredException("RuntimeException", vm2));
+    vm2.invoke(
+        new SerializableRunnable("install message listener to ignore update") {
           @Override
-          public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage msg) {
-            if (msg instanceof DistributedPutAllOperation.PutAllMessage) {
-              DistributionMessageObserver.setInstance(null);
-              Wait.pause(5000); // give vm1 time to process the message that we're ignoring
-              disconnectFromDS(vm0);
-              // no reply will be sent to vm0 due to this exception, but that's okay
-              // because vm0 has been shut down
-              throw new RuntimeException("test code is ignoring message: " + msg);
-            }
+          public void run() {
+            // Add a listener to close vm2 when we send a distributed put all operation
+            // this will cause a retry after we have applied the original put all to
+            // the cache, causing a retry
+            DistributionMessageObserver
+                .setInstance(new DistributionMessageObserver() {
+
+                  @Override
+                  public void beforeProcessMessage(ClusterDistributionManager dm,
+                      DistributionMessage msg) {
+                    if (msg instanceof DistributedPutAllOperation.PutAllMessage) {
+                      DistributionMessageObserver.setInstance(null);
+                      Wait.pause(
+                          5000); // give vm1 time to process the message that we're ignoring
+                      disconnectFromDS(vm0);
+                      // no reply will be sent to vm0 due to this exception, but that's okay
+                      // because vm0 has been shut down
+                      throw new RuntimeException(
+                          "test code is ignoring message: " + msg);
+                    }
+                  }
+                });
+
           }
         });
 
-      }
-    });
-
     // This will be a put all to bucket 0
     // Here's the expected sequence
     // accessor->vm0 (primary)
@@ -354,7 +519,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
       }
     });
 
-
     // verify that the version is correct
     vm1.invoke(new SerializableRunnable("verify vm1") {
 
@@ -369,7 +533,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
       }
     });
 
-
     // Verify the observer was triggered and the version is correct
     vm2.invoke(new SerializableRunnable("verify vm2") {
 
@@ -404,13 +567,13 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     });
   }
 
-
-
   private int createServerRegion(VM vm, final RegionShortcut shortcut) {
-    SerializableCallable createRegion = new SerializableCallable("create server region") {
+    SerializableCallable createRegion = new SerializableCallable(
+        "create server region") {
       @Override
       public Object call() throws Exception {
-        RegionFactory<Object, Object> rf = getCache().createRegionFactory(shortcut);
+        RegionFactory<Object, Object> rf = getCache()
+            .createRegionFactory(shortcut);
         if (!shortcut.equals(RegionShortcut.REPLICATE)) {
           rf.setPartitionAttributes(
               new PartitionAttributesFactory().setRedundantCopies(2).create());
@@ -429,10 +592,12 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
   }
 
   private void createRegionInPeer(VM vm, final RegionShortcut shortcut) {
-    SerializableCallable createRegion = new SerializableCallable("create peer region") {
+    SerializableCallable createRegion = new SerializableCallable(
+        "create peer region") {
       @Override
       public Object call() throws Exception {
-        RegionFactory<Object, Object> rf = getCache().createRegionFactory(shortcut);
+        RegionFactory<Object, Object> rf = getCache()
+            .createRegionFactory(shortcut);
         if (!shortcut.equals(RegionShortcut.REPLICATE)) {
           rf.setPartitionAttributes(
               new PartitionAttributesFactory().setRedundantCopies(2).create());
@@ -451,7 +616,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     return p;
   }
 
-  private int createServerRegionWithPersistence(VM vm, final boolean persistentPdxRegistry) {
+  private int createServerRegionWithPersistence(VM vm,
+      final boolean persistentPdxRegistry) {
     SerializableCallable createRegion = new SerializableCallable() {
       @Override
       public Object call() throws Exception {
@@ -461,7 +627,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
         }
         //
         Cache cache = getCache(cf);
-        cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("store");
+        cache.createDiskStoreFactory().setDiskDirs(getDiskDirs())
+            .create("store");
 
         AttributesFactory af = new AttributesFactory();
         af.setScope(Scope.DISTRIBUTED_ACK);
@@ -500,17 +667,25 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     return (Integer) vm.invoke(createRegion);
   }
 
-  private void createClientRegion(final VM vm, final int port1, final int port2) {
-    SerializableCallable createRegion = new SerializableCallable("create client region in " + vm) {
+  private void createClientRegion(final VM vm, final int port1,
+      final int port2) {
+    SerializableCallable createRegion = new SerializableCallable(
+        "create client region in " + vm) {
       @Override
       public Object call() throws Exception {
         ClientCacheFactory cf = new ClientCacheFactory();
+
         cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port1);
         cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port2);
         cf.setPoolPRSingleHopEnabled(false);
         cf.setPoolReadTimeout(10 * 60 * 1000);
+        cf.setPoolSubscriptionEnabled(true);
+
         ClientCache cache = getClientCache(cf);
-        cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
+        Region region =
+            cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+                .create("region");
+        region.registerInterest("ALL_KEYS");
         return null;
       }
     };
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 3a70b9f..c958d7c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -3101,7 +3101,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     return result;
   }
 
-  private void cacheWriteBeforeRegionClear(RegionEventImpl event)
+  protected void cacheWriteBeforeRegionClear(RegionEventImpl event)
       throws CacheWriterException, TimeoutException {
     // copy into local var to prevent race condition
     CacheWriter writer = basicGetWriter();
@@ -5428,6 +5428,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
       event.setContext(memberId);
       // if this is a replayed or WAN operation we may already have a version tag
       event.setVersionTag(clientEvent.getVersionTag());
+      event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
       try {
         basicDestroy(event, true, null);
       } catch (ConcurrentCacheModificationException ignore) {
@@ -5459,6 +5460,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
 
       // if this is a replayed operation we may already have a version tag
       event.setVersionTag(clientEvent.getVersionTag());
+      event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
 
       try {
         basicInvalidate(event);
@@ -5485,6 +5487,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
 
     // if this is a replayed operation we may already have a version tag
     event.setVersionTag(clientEvent.getVersionTag());
+    event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
 
     try {
       basicUpdateEntryVersion(event);
@@ -8429,6 +8432,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     }
 
     RegionVersionVector myVector = getVersionVector();
+    System.out.println(Thread.currentThread().getName() + ": LocalRegion.clearRegionLocally region="
+        + getName() + "; myVector=" + myVector);
     if (myVector != null) {
       if (isRvvDebugEnabled) {
         logger.trace(LogMarker.RVV_VERBOSE, "processing version information for {}", regionEvent);
@@ -10764,6 +10769,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
 
     try {
       event.setContext(memberId);
+      event.setVersionTag(clientEvent.getVersionTag());
+      event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
       // we rely on exceptions to tell us that the operation didn't take
       // place. AbstractRegionMap performs the checks and throws the exception
       try {