You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/04/28 06:57:24 UTC

[geode] branch feature/GEODE-7702 created (now b868cf6)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-7702
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at b868cf6  GEODE-7702: bulkOp from accessor or NORMAL should sync with clear

This branch includes the following new commits:

     new b868cf6  GEODE-7702: bulkOp from accessor or NORMAL should sync with clear

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-7702: bulkOp from accessor or NORMAL should sync with clear

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-7702
in repository https://gitbox.apache.org/repos/asf/geode.git

commit b868cf60559b3d712018ac75eaabd13f011751f7
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Mon Apr 27 23:55:40 2020 -0700

    GEODE-7702: bulkOp from accessor or NORMAL should sync with clear
---
 .../cache30/DistributedAckRegionCCEDUnitTest.java  |  1 -
 .../geode/cache30/PutAllMultiVmDUnitTest.java      | 92 +++++++++++++++++++++-
 .../apache/geode/internal/cache/LocalRegion.java   |  4 +-
 .../geode/internal/cache/LocalRegionDataView.java  |  7 ++
 .../internal/cache/tx/RemotePutAllMessage.java     |  2 +
 .../internal/cache/tx/RemoteRemoveAllMessage.java  |  2 +
 6 files changed, 101 insertions(+), 7 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
index 25a98cd..bd606c9 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -264,7 +264,6 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
   }
 
   @Test
-  @Ignore("Enable after fix for bug GEODE-1891")
   public void testClearOnNonReplicateWithConcurrentEvents() {
     versionTestClearOnNonReplicateWithConcurrentEvents();
   }
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java
index 51715a4..fc3f331 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java
@@ -20,12 +20,17 @@
  */
 package org.apache.geode.cache30;
 
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.junit.Test;
@@ -41,6 +46,7 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.SerializableRunnable;
@@ -63,8 +69,8 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO:
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);
-    vm0.invoke(() -> PutAllMultiVmDUnitTest.createCache());
-    vm1.invoke(() -> PutAllMultiVmDUnitTest.createCache());
+    vm0.invoke(() -> PutAllMultiVmDUnitTest.createCache(DataPolicy.REPLICATE));
+    vm1.invoke(() -> PutAllMultiVmDUnitTest.createCache(DataPolicy.REPLICATE));
   }
 
   @Override
@@ -83,15 +89,15 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO:
     });
   }
 
-  public static void createCache() {
+  public static void createCache(DataPolicy dataPolicy) {
     try {
       ds = (new PutAllMultiVmDUnitTest()).getSystem(props);
       cache = CacheFactory.create(ds);
       AttributesFactory factory = new AttributesFactory();
       factory.setScope(Scope.DISTRIBUTED_ACK);
+      factory.setDataPolicy(dataPolicy);
       RegionAttributes attr = factory.create();
       region = cache.createRegion("map", attr);
-
     } catch (Exception ex) {
       ex.printStackTrace();
     }
@@ -120,8 +126,86 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO:
     }
   }// end of closeCache
 
+  private AsyncInvocation invokeClear(VM vm) {
+    AsyncInvocation async = vm.invokeAsync(() -> region.clear());
+    return async;
+  }
+
+  private AsyncInvocation invokeBulkOp(VM vm) {
+    AsyncInvocation async = vm.invokeAsync(() -> {
+      Map m = new HashMap();
+      for (int i = 0; i < 20; i++) {
+        m.put(new Integer(i), new String("map" + i));
+      }
+      region.putAll(m);
+
+      HashSet m2 = new HashSet();
+      for (int i = 0; i < 10; i++) {
+        m2.add(new Integer(i));
+      }
+      region.removeAll(m2);
+    });
+    return async;
+  }
+
+  private void testBulkOpFromNonDataStore(final DataPolicy dataPolicy) throws InterruptedException {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    vm2.invoke(() -> PutAllMultiVmDUnitTest.createCache(dataPolicy));
+    Random rand = new Random();
+    for (int k = 0; k < 100; k++) {
+      int shuffle = rand.nextInt(2);
+      AsyncInvocation a1 = null;
+      AsyncInvocation a2 = null;
+      if (shuffle == 1) {
+        a1 = invokeClear(vm1);
+        a2 = invokeBulkOp(vm2);
+      } else {
+        a2 = invokeBulkOp(vm2);
+        a1 = invokeClear(vm1);
+      }
+      a1.await();
+      a2.await();
+
+      // verify vm0 and vm1 has the same keys
+      await().untilAsserted(() -> {
+        Set vm0Contents = vm0.invoke(() -> {
+          final HashSet<Object> keys = new HashSet<>();
+          for (Object o : region.keySet()) {
+            keys.add(o);
+          }
+          return keys;
+        }); // replicated
+        Set vm1Contents = vm1.invoke(() -> {
+          final HashSet<Object> keys = new HashSet<>();
+          for (Object o : region.keySet()) {
+            keys.add(o);
+          }
+          return keys;
+        }); // replicated
+        assertThat(vm0Contents).isEqualTo(vm1Contents);
+      });
+    }
+  }
 
   // tests methods
+  @Test
+  public void testPutAllFromAccessor() throws InterruptedException {
+    testBulkOpFromNonDataStore(DataPolicy.EMPTY);
+  }
+
+  @Test
+  public void testPutAllFromNormal() throws InterruptedException {
+    testBulkOpFromNonDataStore(DataPolicy.NORMAL);
+  }
+
+  @Test
+  public void testPutAllFromPreload() throws InterruptedException {
+    testBulkOpFromNonDataStore(DataPolicy.PRELOADED);
+  }
 
   @Test
   public void testSimplePutAll() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 8559f77..b075b3b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -9359,7 +9359,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * putAll completes. This won't work for non-replicate regions though since they uses one-hop
    * during basicPutPart2 to get a valid version tag.
    */
-  private void lockRVVForBulkOp() {
+  public void lockRVVForBulkOp() {
     ARMLockTestHook testHook = getRegionMap().getARMLockTestHook();
     if (testHook != null) {
       testHook.beforeBulkLock(this);
@@ -9374,7 +9374,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     }
   }
 
-  private void unlockRVVForBulkOp() {
+  public void unlockRVVForBulkOp() {
     ARMLockTestHook testHook = getRegionMap().getARMLockTestHook();
     if (testHook != null) {
       testHook.beforeBulkRelease(this);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index 36ab278..f7f442b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
 import java.util.Collection;
 import java.util.Set;
 
+import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.Region.Entry;
@@ -352,6 +353,9 @@ public class LocalRegionDataView implements InternalDataView {
       successfulPuts.clear();
       putallOp.fillVersionedObjectList(successfulPuts);
     }
+    if (reg.getDataPolicy() == DataPolicy.NORMAL || reg.getDataPolicy() == DataPolicy.PRELOADED) {
+      return;
+    }
     // BR & DR's putAll
     long token = -1;
     try {
@@ -374,6 +378,9 @@ public class LocalRegionDataView implements InternalDataView {
       successfulOps.clear();
       op.fillVersionedObjectList(successfulOps);
     }
+    if (reg.getDataPolicy() == DataPolicy.NORMAL || reg.getDataPolicy() == DataPolicy.PRELOADED) {
+      return;
+    }
     // BR, DR's removeAll
     long token = -1;
     try {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java
index 7399969..54b6a6d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java
@@ -363,6 +363,7 @@ public class RemotePutAllMessage extends RemoteOperationMessageWithDirectReply {
       final DistributedPutAllOperation dpao =
           new DistributedPutAllOperation(baseEvent, putAllDataCount, false);
       try {
+        r.lockRVVForBulkOp();
         final VersionedObjectList versions =
             new VersionedObjectList(putAllDataCount, true, dr.getConcurrencyChecksEnabled());
         dr.syncBulkOp(new Runnable() {
@@ -397,6 +398,7 @@ public class RemotePutAllMessage extends RemoteOperationMessageWithDirectReply {
             this.putAllDataCount);
         return false;
       } finally {
+        r.unlockRVVForBulkOp();
         dpao.freeOffHeapResources();
       }
     } finally {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java
index a2e95a3..beb202d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java
@@ -355,6 +355,7 @@ public class RemoteRemoveAllMessage extends RemoteOperationMessageWithDirectRepl
       final DistributedRemoveAllOperation op =
           new DistributedRemoveAllOperation(baseEvent, removeAllDataCount, false);
       try {
+        r.lockRVVForBulkOp();
         final VersionedObjectList versions =
             new VersionedObjectList(removeAllDataCount, true, dr.getConcurrencyChecksEnabled());
         dr.syncBulkOp(new Runnable() {
@@ -391,6 +392,7 @@ public class RemoteRemoveAllMessage extends RemoteOperationMessageWithDirectRepl
             this.removeAllDataCount);
         return false;
       } finally {
+        r.unlockRVVForBulkOp();
         op.freeOffHeapResources();
       }
     } finally {