You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2019/06/28 07:09:38 UTC

[geode] branch feature/GEODE-6908 created (now c57d158)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-6908
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at c57d158  GEODE-6908: retried REMOVE should not create new version tag.             added retry dunit tests for all the c/s operations.

This branch includes the following new commits:

     new c57d158  GEODE-6908: retried REMOVE should not create new version tag.             added retry dunit tests for all the c/s operations.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-6908: retried REMOVE should not create new version tag. added retry dunit tests for all the c/s operations.

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-6908
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c57d15803dd9c9146af92a027724eaf777aa22d4
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Fri Jun 28 00:06:11 2019 -0700

    GEODE-6908: retried REMOVE should not create new version tag.
                added retry dunit tests for all the c/s operations.
---
 .../pdx/ClientsWithVersioningRetryDUnitTest.java   | 285 +++++++++++++++++----
 .../apache/geode/internal/cache/LocalRegion.java   |   9 +-
 2 files changed, 238 insertions(+), 56 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java
index e992d21..5beab36 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java
@@ -49,6 +49,7 @@ import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.DistributedCacheOperation;
 import org.apache.geode.internal.cache.DistributedPutAllOperation;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
@@ -117,7 +118,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     final VM vm0 = host.getVM(0);
     final VM vm1 = host.getVM(1);
 
-
     createServerRegion(vm0, RegionShortcut.REPLICATE);
     createServerRegion(vm1, RegionShortcut.REPLICATE);
 
@@ -160,7 +160,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
           event.setContext(new ClientProxyMembershipID(memberID));
           boolean recovered =
               ((BaseCommand) Put70.getCommand()).recoverVersionTagForRetriedOperation(event);
-          assertTrue("Expected to recover the version for this event ID", recovered);
+          assertTrue("Expected to recover the version for this event ID",
+              recovered);
           assertEquals("Expected the region version to be 123", 123,
               event.getVersionTag().getRegionVersion());
         } finally {
@@ -173,10 +174,12 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     vm1.invoke(new SerializableRunnable("recover posdup event tag in vm1 event tracker from vm0") {
       @Override
       public void run() {
-        DistributedRegion dr = (DistributedRegion) getCache().getRegion("region");
+        DistributedRegion dr = (DistributedRegion) getCache()
+            .getRegion("region");
         EventID eventID = new EventID(new byte[0], 1, 0);
-        EntryEventImpl event = EntryEventImpl.create(dr, Operation.CREATE, "TestObject",
-            "TestValue", null, false, memberID, true, eventID);
+        EntryEventImpl event = EntryEventImpl
+            .create(dr, Operation.CREATE, "TestObject",
+                "TestValue", null, false, memberID, true, eventID);
         event.setPossibleDuplicate(true);
         try {
           dr.hasSeenEvent(event);
@@ -189,6 +192,162 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     });
   }
 
+  enum OpType {
+    CREATE,
+    PUT,
+    DESTROY,
+    INVALIDATE,
+    PUT_IF_ABSENT,
+    REPLACE,
+    REPLACE_WITH_OLDVALUE,
+    REMOVE
+  };
+
+  @Test
+  public void testRetriedCreate() {
+    doRetriedTest(OpType.CREATE);
+  }
+
+  @Test
+  public void testRetriedPut() {
+    doRetriedTest(OpType.PUT);
+  }
+
+  @Test
+  public void testRetriedDestroy() {
+    doRetriedTest(OpType.DESTROY);
+  }
+
+  @Test
+  public void testRetriedInvalidate() {
+    doRetriedTest(OpType.INVALIDATE);
+  }
+
+  @Test
+  public void testRetriedPutIfAbsent() {
+    doRetriedTest(OpType.PUT_IF_ABSENT);
+  }
+
+  @Test
+  public void testRetriedReplace() {
+    doRetriedTest(OpType.REPLACE);
+  }
+
+  @Test
+  public void testRetriedReplaceWithOldValue() {
+    doRetriedTest(OpType.REPLACE_WITH_OLDVALUE);
+  }
+
+  @Test
+  public void testRetriedRemove() {
+    doRetriedTest(OpType.REMOVE);
+  }
+
+  private void doRetriedTest(final OpType opType) {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm3 = host.getVM(3);
+
+    int port0 = createServerRegion(vm0, RegionShortcut.REPLICATE);
+    int port1 = createServerRegion(vm1, RegionShortcut.REPLICATE);
+    createClientRegion(vm3, port0, port1);
+
+    vm0.invoke(new SerializableRunnable() {
+
+      @Override
+      public void run() {
+        Region region = getCache().getRegion("region");
+        if (opType != OpType.CREATE && opType != OpType.PUT_IF_ABSENT) {
+          region.put(0, "value");
+        }
+
+        // Add a listener to close vm0 when we send a distributed operation
+        // this will cause a retry after we have applied the original put all to
+        // the cache, causing a retry
+        DistributionMessageObserver
+            .setInstance(new DistributionMessageObserver() {
+
+              @Override
+              public void beforeSendMessage(ClusterDistributionManager dm,
+                  DistributionMessage message) {
+                if (message instanceof DistributedCacheOperation.CacheOperationMessage) {
+                  DistributedCacheOperation.CacheOperationMessage com =
+                      (DistributedCacheOperation.CacheOperationMessage) message;
+                  VersionTag tag = com.getVersionTag();
+                  if (((opType == OpType.CREATE || opType == OpType.PUT_IF_ABSENT)
+                      && tag.getEntryVersion() == 1) || tag.getEntryVersion() == 2) {
+                    DistributionMessageObserver.setInstance(null);
+                    disconnectFromDS(vm0);
+                  }
+                }
+              }
+            });
+
+      }
+    });
+
+    // this put operation will trigger vm1 to be closed, and the put will be retried
+    vm3.invoke(new SerializableCallable("perform update in client") {
+      @Override
+      public Object call() throws Exception {
+        Region region = getCache().getRegion("region");
+        switch (opType) {
+          case CREATE:
+            region.create(0, "newvalue");
+            break;
+          case PUT:
+            region.put(0, "newvalue");
+            break;
+          case DESTROY:
+            region.destroy(0);
+            break;
+          case INVALIDATE:
+            region.invalidate(0);
+            break;
+          case PUT_IF_ABSENT:
+            region.putIfAbsent(0, "newvalue");
+            break;
+          case REPLACE:
+            region.replace(0, "newvalue");
+            break;
+          case REPLACE_WITH_OLDVALUE:
+            region.replace(0, "value", "newvalue");
+            break;
+          case REMOVE:
+            region.remove(0, "value");
+            break;
+        }
+        return null;
+      }
+    });
+
+    // Verify the observer was triggered
+    vm1.invoke(new SerializableRunnable() {
+
+      @Override
+      public void run() {
+        // if the observer was triggered, it would have cleared itself
+        assertNull(DistributionMessageObserver.getInstance());
+        VersionTag tag = ((LocalRegion) getCache().getRegion("region"))
+            .getVersionTag(0);
+        if (opType == OpType.CREATE || opType == OpType.PUT_IF_ABSENT) {
+          assertEquals(1, tag.getRegionVersion());
+        } else {
+          assertEquals(2, tag.getRegionVersion());
+        }
+      }
+    });
+
+    // Make sure vm0 did in fact shut down
+    vm0.invoke(new SerializableRunnable() {
+      @Override
+      public void run() {
+        GemFireCacheImpl cache = (GemFireCacheImpl) basicGetCache();
+        assertTrue(cache == null || cache.isClosed());
+      }
+    });
+  }
 
   /**
    * Test that we can successfully retry a distributed put all and get the version information. bug
@@ -202,7 +361,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     final VM vm2 = host.getVM(2);
     final VM vm3 = host.getVM(3);
 
-
     createServerRegion(vm0, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
     vm0.invoke(new SerializableRunnable() {
 
@@ -215,27 +373,28 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
         // Add a listener to close vm1 when we send a distributed put all operation
         // this will cause a retry after we have applied the original put all to
         // the cache, causing a retry
-        DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
-
-          @Override
-          public void beforeSendMessage(ClusterDistributionManager dm,
-              DistributionMessage message) {
-            if (message instanceof DistributedPutAllOperation.PutAllMessage) {
-              DistributionMessageObserver.setInstance(null);
-              disconnectFromDS(vm1);
-            }
-          }
-        });
+        DistributionMessageObserver
+            .setInstance(new DistributionMessageObserver() {
+
+              @Override
+              public void beforeSendMessage(ClusterDistributionManager dm,
+                  DistributionMessage message) {
+                if (message instanceof DistributedPutAllOperation.PutAllMessage) {
+                  DistributionMessageObserver.setInstance(null);
+                  disconnectFromDS(vm1);
+                }
+              }
+            });
 
       }
     });
 
-    int port1 = createServerRegion(vm1, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
-    int port2 = createServerRegion(vm2, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
+    int port1 = createServerRegion(vm1,
+        RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
+    int port2 = createServerRegion(vm2,
+        RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
     createClientRegion(vm3, port1, port2);
 
-
-
     // This will be a put all to bucket 0
     // Here's the expected sequence
     // client->vm1 (accessor0)
@@ -310,31 +469,37 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     LogWriterUtils.getLogWriter().info("creating region in vm3");
     createRegionInPeer(vm3, RegionShortcut.PARTITION_PROXY);
 
-    expectedExceptions.add(IgnoredException.addIgnoredException("RuntimeException", vm2));
-    vm2.invoke(new SerializableRunnable("install message listener to ignore update") {
-      @Override
-      public void run() {
-        // Add a listener to close vm2 when we send a distributed put all operation
-        // this will cause a retry after we have applied the original put all to
-        // the cache, causing a retry
-        DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
-
+    expectedExceptions
+        .add(IgnoredException.addIgnoredException("RuntimeException", vm2));
+    vm2.invoke(
+        new SerializableRunnable("install message listener to ignore update") {
           @Override
-          public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage msg) {
-            if (msg instanceof DistributedPutAllOperation.PutAllMessage) {
-              DistributionMessageObserver.setInstance(null);
-              Wait.pause(5000); // give vm1 time to process the message that we're ignoring
-              disconnectFromDS(vm0);
-              // no reply will be sent to vm0 due to this exception, but that's okay
-              // because vm0 has been shut down
-              throw new RuntimeException("test code is ignoring message: " + msg);
-            }
+          public void run() {
+            // Add a listener to close vm2 when we send a distributed put all operation
+            // this will cause a retry after we have applied the original put all to
+            // the cache, causing a retry
+            DistributionMessageObserver
+                .setInstance(new DistributionMessageObserver() {
+
+                  @Override
+                  public void beforeProcessMessage(ClusterDistributionManager dm,
+                      DistributionMessage msg) {
+                    if (msg instanceof DistributedPutAllOperation.PutAllMessage) {
+                      DistributionMessageObserver.setInstance(null);
+                      Wait.pause(
+                          5000); // give vm1 time to process the message that we're ignoring
+                      disconnectFromDS(vm0);
+                      // no reply will be sent to vm0 due to this exception, but that's okay
+                      // because vm0 has been shut down
+                      throw new RuntimeException(
+                          "test code is ignoring message: " + msg);
+                    }
+                  }
+                });
+
           }
         });
 
-      }
-    });
-
     // This will be a put all to bucket 0
     // Here's the expected sequence
     // accessor->vm0 (primary)
@@ -354,7 +519,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
       }
     });
 
-
     // verify that the version is correct
     vm1.invoke(new SerializableRunnable("verify vm1") {
 
@@ -369,7 +533,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
       }
     });
 
-
     // Verify the observer was triggered and the version is correct
     vm2.invoke(new SerializableRunnable("verify vm2") {
 
@@ -404,13 +567,13 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     });
   }
 
-
-
   private int createServerRegion(VM vm, final RegionShortcut shortcut) {
-    SerializableCallable createRegion = new SerializableCallable("create server region") {
+    SerializableCallable createRegion = new SerializableCallable(
+        "create server region") {
       @Override
       public Object call() throws Exception {
-        RegionFactory<Object, Object> rf = getCache().createRegionFactory(shortcut);
+        RegionFactory<Object, Object> rf = getCache()
+            .createRegionFactory(shortcut);
         if (!shortcut.equals(RegionShortcut.REPLICATE)) {
           rf.setPartitionAttributes(
               new PartitionAttributesFactory().setRedundantCopies(2).create());
@@ -429,10 +592,12 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
   }
 
   private void createRegionInPeer(VM vm, final RegionShortcut shortcut) {
-    SerializableCallable createRegion = new SerializableCallable("create peer region") {
+    SerializableCallable createRegion = new SerializableCallable(
+        "create peer region") {
       @Override
       public Object call() throws Exception {
-        RegionFactory<Object, Object> rf = getCache().createRegionFactory(shortcut);
+        RegionFactory<Object, Object> rf = getCache()
+            .createRegionFactory(shortcut);
         if (!shortcut.equals(RegionShortcut.REPLICATE)) {
           rf.setPartitionAttributes(
               new PartitionAttributesFactory().setRedundantCopies(2).create());
@@ -451,7 +616,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     return p;
   }
 
-  private int createServerRegionWithPersistence(VM vm, final boolean persistentPdxRegistry) {
+  private int createServerRegionWithPersistence(VM vm,
+      final boolean persistentPdxRegistry) {
     SerializableCallable createRegion = new SerializableCallable() {
       @Override
       public Object call() throws Exception {
@@ -461,7 +627,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
         }
         //
         Cache cache = getCache(cf);
-        cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("store");
+        cache.createDiskStoreFactory().setDiskDirs(getDiskDirs())
+            .create("store");
 
         AttributesFactory af = new AttributesFactory();
         af.setScope(Scope.DISTRIBUTED_ACK);
@@ -500,17 +667,25 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
     return (Integer) vm.invoke(createRegion);
   }
 
-  private void createClientRegion(final VM vm, final int port1, final int port2) {
-    SerializableCallable createRegion = new SerializableCallable("create client region in " + vm) {
+  private void createClientRegion(final VM vm, final int port1,
+      final int port2) {
+    SerializableCallable createRegion = new SerializableCallable(
+        "create client region in " + vm) {
       @Override
       public Object call() throws Exception {
         ClientCacheFactory cf = new ClientCacheFactory();
+
         cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port1);
         cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port2);
         cf.setPoolPRSingleHopEnabled(false);
         cf.setPoolReadTimeout(10 * 60 * 1000);
+        cf.setPoolSubscriptionEnabled(true);
+
         ClientCache cache = getClientCache(cf);
-        cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
+        Region region =
+            cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+                .create("region");
+        region.registerInterest("ALL_KEYS");
         return null;
       }
     };
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 3a70b9f..c958d7c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -3101,7 +3101,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     return result;
   }
 
-  private void cacheWriteBeforeRegionClear(RegionEventImpl event)
+  protected void cacheWriteBeforeRegionClear(RegionEventImpl event)
       throws CacheWriterException, TimeoutException {
     // copy into local var to prevent race condition
     CacheWriter writer = basicGetWriter();
@@ -5428,6 +5428,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
       event.setContext(memberId);
       // if this is a replayed or WAN operation we may already have a version tag
       event.setVersionTag(clientEvent.getVersionTag());
+      event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
       try {
         basicDestroy(event, true, null);
       } catch (ConcurrentCacheModificationException ignore) {
@@ -5459,6 +5460,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
 
       // if this is a replayed operation we may already have a version tag
       event.setVersionTag(clientEvent.getVersionTag());
+      event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
 
       try {
         basicInvalidate(event);
@@ -5485,6 +5487,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
 
     // if this is a replayed operation we may already have a version tag
     event.setVersionTag(clientEvent.getVersionTag());
+    event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
 
     try {
       basicUpdateEntryVersion(event);
@@ -8429,6 +8432,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     }
 
     RegionVersionVector myVector = getVersionVector();
+    System.out.println(Thread.currentThread().getName() + ": LocalRegion.clearRegionLocally region="
+        + getName() + "; myVector=" + myVector);
     if (myVector != null) {
       if (isRvvDebugEnabled) {
         logger.trace(LogMarker.RVV_VERBOSE, "processing version information for {}", regionEvent);
@@ -10764,6 +10769,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
 
     try {
       event.setContext(memberId);
+      event.setVersionTag(clientEvent.getVersionTag());
+      event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
       // we rely on exceptions to tell us that the operation didn't take
       // place. AbstractRegionMap performs the checks and throws the exception
       try {