You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/04/28 06:57:25 UTC
[geode] 01/01: GEODE-7702: bulkOp from accessor or NORMAL should
sync with clear
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch feature/GEODE-7702
in repository https://gitbox.apache.org/repos/asf/geode.git
commit b868cf60559b3d712018ac75eaabd13f011751f7
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Mon Apr 27 23:55:40 2020 -0700
GEODE-7702: bulkOp from accessor or NORMAL should sync with clear
---
.../cache30/DistributedAckRegionCCEDUnitTest.java | 1 -
.../geode/cache30/PutAllMultiVmDUnitTest.java | 92 +++++++++++++++++++++-
.../apache/geode/internal/cache/LocalRegion.java | 4 +-
.../geode/internal/cache/LocalRegionDataView.java | 7 ++
.../internal/cache/tx/RemotePutAllMessage.java | 2 +
.../internal/cache/tx/RemoteRemoveAllMessage.java | 2 +
6 files changed, 101 insertions(+), 7 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
index 25a98cd..bd606c9 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -264,7 +264,6 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
}
@Test
- @Ignore("Enable after fix for bug GEODE-1891")
public void testClearOnNonReplicateWithConcurrentEvents() {
versionTestClearOnNonReplicateWithConcurrentEvents();
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java
index 51715a4..fc3f331 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java
@@ -20,12 +20,17 @@
*/
package org.apache.geode.cache30;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import org.junit.Test;
@@ -41,6 +46,7 @@ import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.SerializableRunnable;
@@ -63,8 +69,8 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO:
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
- vm0.invoke(() -> PutAllMultiVmDUnitTest.createCache());
- vm1.invoke(() -> PutAllMultiVmDUnitTest.createCache());
+ vm0.invoke(() -> PutAllMultiVmDUnitTest.createCache(DataPolicy.REPLICATE));
+ vm1.invoke(() -> PutAllMultiVmDUnitTest.createCache(DataPolicy.REPLICATE));
}
@Override
@@ -83,15 +89,15 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO:
});
}
- public static void createCache() {
+ public static void createCache(DataPolicy dataPolicy) {
try {
ds = (new PutAllMultiVmDUnitTest()).getSystem(props);
cache = CacheFactory.create(ds);
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setDataPolicy(dataPolicy);
RegionAttributes attr = factory.create();
region = cache.createRegion("map", attr);
-
} catch (Exception ex) {
ex.printStackTrace();
}
@@ -120,8 +126,86 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO:
}
}// end of closeCache
+ private AsyncInvocation invokeClear(VM vm) {
+ AsyncInvocation async = vm.invokeAsync(() -> region.clear());
+ return async;
+ }
+
+ private AsyncInvocation invokeBulkOp(VM vm) {
+ AsyncInvocation async = vm.invokeAsync(() -> {
+ Map m = new HashMap();
+ for (int i = 0; i < 20; i++) {
+ m.put(new Integer(i), new String("map" + i));
+ }
+ region.putAll(m);
+
+ HashSet m2 = new HashSet();
+ for (int i = 0; i < 10; i++) {
+ m2.add(new Integer(i));
+ }
+ region.removeAll(m2);
+ });
+ return async;
+ }
+
+ private void testBulkOpFromNonDataStore(final DataPolicy dataPolicy) throws InterruptedException {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ vm2.invoke(() -> PutAllMultiVmDUnitTest.createCache(dataPolicy));
+ Random rand = new Random();
+ for (int k = 0; k < 100; k++) {
+ int shuffle = rand.nextInt(2);
+ AsyncInvocation a1 = null;
+ AsyncInvocation a2 = null;
+ if (shuffle == 1) {
+ a1 = invokeClear(vm1);
+ a2 = invokeBulkOp(vm2);
+ } else {
+ a2 = invokeBulkOp(vm2);
+ a1 = invokeClear(vm1);
+ }
+ a1.await();
+ a2.await();
+
+ // verify vm0 and vm1 has the same keys
+ await().untilAsserted(() -> {
+ Set vm0Contents = vm0.invoke(() -> {
+ final HashSet<Object> keys = new HashSet<>();
+ for (Object o : region.keySet()) {
+ keys.add(o);
+ }
+ return keys;
+ }); // replicated
+ Set vm1Contents = vm1.invoke(() -> {
+ final HashSet<Object> keys = new HashSet<>();
+ for (Object o : region.keySet()) {
+ keys.add(o);
+ }
+ return keys;
+ }); // replicated
+ assertThat(vm0Contents).isEqualTo(vm1Contents);
+ });
+ }
+ }
// tests methods
+ @Test
+ public void testPutAllFromAccessor() throws InterruptedException {
+ testBulkOpFromNonDataStore(DataPolicy.EMPTY);
+ }
+
+ @Test
+ public void testPutAllFromNormal() throws InterruptedException {
+ testBulkOpFromNonDataStore(DataPolicy.NORMAL);
+ }
+
+ @Test
+ public void testPutAllFromPreload() throws InterruptedException {
+ testBulkOpFromNonDataStore(DataPolicy.PRELOADED);
+ }
@Test
public void testSimplePutAll() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 8559f77..b075b3b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -9359,7 +9359,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* putAll completes. This won't work for non-replicate regions though since they uses one-hop
* during basicPutPart2 to get a valid version tag.
*/
- private void lockRVVForBulkOp() {
+ public void lockRVVForBulkOp() {
ARMLockTestHook testHook = getRegionMap().getARMLockTestHook();
if (testHook != null) {
testHook.beforeBulkLock(this);
@@ -9374,7 +9374,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- private void unlockRVVForBulkOp() {
+ public void unlockRVVForBulkOp() {
ARMLockTestHook testHook = getRegionMap().getARMLockTestHook();
if (testHook != null) {
testHook.beforeBulkRelease(this);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index 36ab278..f7f442b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
import java.util.Collection;
import java.util.Set;
+import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
@@ -352,6 +353,9 @@ public class LocalRegionDataView implements InternalDataView {
successfulPuts.clear();
putallOp.fillVersionedObjectList(successfulPuts);
}
+ if (reg.getDataPolicy() == DataPolicy.NORMAL || reg.getDataPolicy() == DataPolicy.PRELOADED) {
+ return;
+ }
// BR & DR's putAll
long token = -1;
try {
@@ -374,6 +378,9 @@ public class LocalRegionDataView implements InternalDataView {
successfulOps.clear();
op.fillVersionedObjectList(successfulOps);
}
+ if (reg.getDataPolicy() == DataPolicy.NORMAL || reg.getDataPolicy() == DataPolicy.PRELOADED) {
+ return;
+ }
// BR, DR's removeAll
long token = -1;
try {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java
index 7399969..54b6a6d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java
@@ -363,6 +363,7 @@ public class RemotePutAllMessage extends RemoteOperationMessageWithDirectReply {
final DistributedPutAllOperation dpao =
new DistributedPutAllOperation(baseEvent, putAllDataCount, false);
try {
+ r.lockRVVForBulkOp();
final VersionedObjectList versions =
new VersionedObjectList(putAllDataCount, true, dr.getConcurrencyChecksEnabled());
dr.syncBulkOp(new Runnable() {
@@ -397,6 +398,7 @@ public class RemotePutAllMessage extends RemoteOperationMessageWithDirectReply {
this.putAllDataCount);
return false;
} finally {
+ r.unlockRVVForBulkOp();
dpao.freeOffHeapResources();
}
} finally {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java
index a2e95a3..beb202d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java
@@ -355,6 +355,7 @@ public class RemoteRemoveAllMessage extends RemoteOperationMessageWithDirectRepl
final DistributedRemoveAllOperation op =
new DistributedRemoveAllOperation(baseEvent, removeAllDataCount, false);
try {
+ r.lockRVVForBulkOp();
final VersionedObjectList versions =
new VersionedObjectList(removeAllDataCount, true, dr.getConcurrencyChecksEnabled());
dr.syncBulkOp(new Runnable() {
@@ -391,6 +392,7 @@ public class RemoteRemoveAllMessage extends RemoteOperationMessageWithDirectRepl
this.removeAllDataCount);
return false;
} finally {
+ r.unlockRVVForBulkOp();
op.freeOffHeapResources();
}
} finally {