You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/04/28 06:57:25 UTC

[geode] 01/01: GEODE-7702: bulkOp from accessor or NORMAL should sync with clear

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-7702
in repository https://gitbox.apache.org/repos/asf/geode.git

commit b868cf60559b3d712018ac75eaabd13f011751f7
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Mon Apr 27 23:55:40 2020 -0700

    GEODE-7702: bulkOp from accessor or NORMAL should sync with clear
---
 .../cache30/DistributedAckRegionCCEDUnitTest.java  |  1 -
 .../geode/cache30/PutAllMultiVmDUnitTest.java      | 92 +++++++++++++++++++++-
 .../apache/geode/internal/cache/LocalRegion.java   |  4 +-
 .../geode/internal/cache/LocalRegionDataView.java  |  7 ++
 .../internal/cache/tx/RemotePutAllMessage.java     |  2 +
 .../internal/cache/tx/RemoteRemoveAllMessage.java  |  2 +
 6 files changed, 101 insertions(+), 7 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
index 25a98cd..bd606c9 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -264,7 +264,6 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
   }
 
   @Test
-  @Ignore("Enable after fix for bug GEODE-1891")
   public void testClearOnNonReplicateWithConcurrentEvents() {
     versionTestClearOnNonReplicateWithConcurrentEvents();
   }
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java
index 51715a4..fc3f331 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java
@@ -20,12 +20,17 @@
  */
 package org.apache.geode.cache30;
 
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.junit.Test;
@@ -41,6 +46,7 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.SerializableRunnable;
@@ -63,8 +69,8 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO:
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);
-    vm0.invoke(() -> PutAllMultiVmDUnitTest.createCache());
-    vm1.invoke(() -> PutAllMultiVmDUnitTest.createCache());
+    vm0.invoke(() -> PutAllMultiVmDUnitTest.createCache(DataPolicy.REPLICATE));
+    vm1.invoke(() -> PutAllMultiVmDUnitTest.createCache(DataPolicy.REPLICATE));
   }
 
   @Override
@@ -83,15 +89,15 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO:
     });
   }
 
-  public static void createCache() {
+  public static void createCache(DataPolicy dataPolicy) {
     try {
       ds = (new PutAllMultiVmDUnitTest()).getSystem(props);
       cache = CacheFactory.create(ds);
       AttributesFactory factory = new AttributesFactory();
       factory.setScope(Scope.DISTRIBUTED_ACK);
+      factory.setDataPolicy(dataPolicy);
       RegionAttributes attr = factory.create();
       region = cache.createRegion("map", attr);
-
     } catch (Exception ex) {
       ex.printStackTrace();
     }
@@ -120,8 +126,86 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO:
     }
   }// end of closeCache
 
+  private AsyncInvocation invokeClear(VM vm) {
+    AsyncInvocation async = vm.invokeAsync(() -> region.clear());
+    return async;
+  }
+
+  private AsyncInvocation invokeBulkOp(VM vm) {
+    AsyncInvocation async = vm.invokeAsync(() -> {
+      Map m = new HashMap();
+      for (int i = 0; i < 20; i++) {
+        m.put(new Integer(i), new String("map" + i));
+      }
+      region.putAll(m);
+
+      HashSet m2 = new HashSet();
+      for (int i = 0; i < 10; i++) {
+        m2.add(new Integer(i));
+      }
+      region.removeAll(m2);
+    });
+    return async;
+  }
+
+  private void testBulkOpFromNonDataStore(final DataPolicy dataPolicy) throws InterruptedException {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    vm2.invoke(() -> PutAllMultiVmDUnitTest.createCache(dataPolicy));
+    Random rand = new Random();
+    for (int k = 0; k < 100; k++) {
+      int shuffle = rand.nextInt(2);
+      AsyncInvocation a1 = null;
+      AsyncInvocation a2 = null;
+      if (shuffle == 1) {
+        a1 = invokeClear(vm1);
+        a2 = invokeBulkOp(vm2);
+      } else {
+        a2 = invokeBulkOp(vm2);
+        a1 = invokeClear(vm1);
+      }
+      a1.await();
+      a2.await();
+
+      // verify vm0 and vm1 has the same keys
+      await().untilAsserted(() -> {
+        Set vm0Contents = vm0.invoke(() -> {
+          final HashSet<Object> keys = new HashSet<>();
+          for (Object o : region.keySet()) {
+            keys.add(o);
+          }
+          return keys;
+        }); // replicated
+        Set vm1Contents = vm1.invoke(() -> {
+          final HashSet<Object> keys = new HashSet<>();
+          for (Object o : region.keySet()) {
+            keys.add(o);
+          }
+          return keys;
+        }); // replicated
+        assertThat(vm0Contents).isEqualTo(vm1Contents);
+      });
+    }
+  }
 
   // tests methods
+  @Test
+  public void testPutAllFromAccessor() throws InterruptedException {
+    testBulkOpFromNonDataStore(DataPolicy.EMPTY);
+  }
+
+  @Test
+  public void testPutAllFromNormal() throws InterruptedException {
+    testBulkOpFromNonDataStore(DataPolicy.NORMAL);
+  }
+
+  @Test
+  public void testPutAllFromPreload() throws InterruptedException {
+    testBulkOpFromNonDataStore(DataPolicy.PRELOADED);
+  }
 
   @Test
   public void testSimplePutAll() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 8559f77..b075b3b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -9359,7 +9359,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * putAll completes. This won't work for non-replicate regions though since they uses one-hop
    * during basicPutPart2 to get a valid version tag.
    */
-  private void lockRVVForBulkOp() {
+  public void lockRVVForBulkOp() {
     ARMLockTestHook testHook = getRegionMap().getARMLockTestHook();
     if (testHook != null) {
       testHook.beforeBulkLock(this);
@@ -9374,7 +9374,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     }
   }
 
-  private void unlockRVVForBulkOp() {
+  public void unlockRVVForBulkOp() {
     ARMLockTestHook testHook = getRegionMap().getARMLockTestHook();
     if (testHook != null) {
       testHook.beforeBulkRelease(this);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index 36ab278..f7f442b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
 import java.util.Collection;
 import java.util.Set;
 
+import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.Region.Entry;
@@ -352,6 +353,9 @@ public class LocalRegionDataView implements InternalDataView {
       successfulPuts.clear();
       putallOp.fillVersionedObjectList(successfulPuts);
     }
+    if (reg.getDataPolicy() == DataPolicy.NORMAL || reg.getDataPolicy() == DataPolicy.PRELOADED) {
+      return;
+    }
     // BR & DR's putAll
     long token = -1;
     try {
@@ -374,6 +378,9 @@ public class LocalRegionDataView implements InternalDataView {
       successfulOps.clear();
       op.fillVersionedObjectList(successfulOps);
     }
+    if (reg.getDataPolicy() == DataPolicy.NORMAL || reg.getDataPolicy() == DataPolicy.PRELOADED) {
+      return;
+    }
     // BR, DR's removeAll
     long token = -1;
     try {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java
index 7399969..54b6a6d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java
@@ -363,6 +363,7 @@ public class RemotePutAllMessage extends RemoteOperationMessageWithDirectReply {
       final DistributedPutAllOperation dpao =
           new DistributedPutAllOperation(baseEvent, putAllDataCount, false);
       try {
+        r.lockRVVForBulkOp();
         final VersionedObjectList versions =
             new VersionedObjectList(putAllDataCount, true, dr.getConcurrencyChecksEnabled());
         dr.syncBulkOp(new Runnable() {
@@ -397,6 +398,7 @@ public class RemotePutAllMessage extends RemoteOperationMessageWithDirectReply {
             this.putAllDataCount);
         return false;
       } finally {
+        r.unlockRVVForBulkOp();
         dpao.freeOffHeapResources();
       }
     } finally {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java
index a2e95a3..beb202d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java
@@ -355,6 +355,7 @@ public class RemoteRemoveAllMessage extends RemoteOperationMessageWithDirectRepl
       final DistributedRemoveAllOperation op =
           new DistributedRemoveAllOperation(baseEvent, removeAllDataCount, false);
       try {
+        r.lockRVVForBulkOp();
         final VersionedObjectList versions =
             new VersionedObjectList(removeAllDataCount, true, dr.getConcurrencyChecksEnabled());
         dr.syncBulkOp(new Runnable() {
@@ -391,6 +392,7 @@ public class RemoteRemoveAllMessage extends RemoteOperationMessageWithDirectRepl
             this.removeAllDataCount);
         return false;
       } finally {
+        r.unlockRVVForBulkOp();
         op.freeOffHeapResources();
       }
     } finally {