You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2019/06/28 07:09:39 UTC
[geode] 01/01: GEODE-6908: retried REMOVE should not create new
version tag. added retry dunit tests for all the c/s operations.
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch feature/GEODE-6908
in repository https://gitbox.apache.org/repos/asf/geode.git
commit c57d15803dd9c9146af92a027724eaf777aa22d4
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Fri Jun 28 00:06:11 2019 -0700
GEODE-6908: retried REMOVE should not create new version tag.
added retry dunit tests for all the c/s operations.
---
.../pdx/ClientsWithVersioningRetryDUnitTest.java | 285 +++++++++++++++++----
.../apache/geode/internal/cache/LocalRegion.java | 9 +-
2 files changed, 238 insertions(+), 56 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java
index e992d21..5beab36 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java
@@ -49,6 +49,7 @@ import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.DistributedCacheOperation;
import org.apache.geode.internal.cache.DistributedPutAllOperation;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
@@ -117,7 +118,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
final VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
-
createServerRegion(vm0, RegionShortcut.REPLICATE);
createServerRegion(vm1, RegionShortcut.REPLICATE);
@@ -160,7 +160,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
event.setContext(new ClientProxyMembershipID(memberID));
boolean recovered =
((BaseCommand) Put70.getCommand()).recoverVersionTagForRetriedOperation(event);
- assertTrue("Expected to recover the version for this event ID", recovered);
+ assertTrue("Expected to recover the version for this event ID",
+ recovered);
assertEquals("Expected the region version to be 123", 123,
event.getVersionTag().getRegionVersion());
} finally {
@@ -173,10 +174,12 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
vm1.invoke(new SerializableRunnable("recover posdup event tag in vm1 event tracker from vm0") {
@Override
public void run() {
- DistributedRegion dr = (DistributedRegion) getCache().getRegion("region");
+ DistributedRegion dr = (DistributedRegion) getCache()
+ .getRegion("region");
EventID eventID = new EventID(new byte[0], 1, 0);
- EntryEventImpl event = EntryEventImpl.create(dr, Operation.CREATE, "TestObject",
- "TestValue", null, false, memberID, true, eventID);
+ EntryEventImpl event = EntryEventImpl
+ .create(dr, Operation.CREATE, "TestObject",
+ "TestValue", null, false, memberID, true, eventID);
event.setPossibleDuplicate(true);
try {
dr.hasSeenEvent(event);
@@ -189,6 +192,162 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
});
}
+ enum OpType {
+ CREATE,
+ PUT,
+ DESTROY,
+ INVALIDATE,
+ PUT_IF_ABSENT,
+ REPLACE,
+ REPLACE_WITH_OLDVALUE,
+ REMOVE
+ };
+
+ @Test
+ public void testRetriedCreate() {
+ doRetriedTest(OpType.CREATE);
+ }
+
+ @Test
+ public void testRetriedPut() {
+ doRetriedTest(OpType.PUT);
+ }
+
+ @Test
+ public void testRetriedDestroy() {
+ doRetriedTest(OpType.DESTROY);
+ }
+
+ @Test
+ public void testRetriedInvalidate() {
+ doRetriedTest(OpType.INVALIDATE);
+ }
+
+ @Test
+ public void testRetriedPutIfAbsent() {
+ doRetriedTest(OpType.PUT_IF_ABSENT);
+ }
+
+ @Test
+ public void testRetriedReplace() {
+ doRetriedTest(OpType.REPLACE);
+ }
+
+ @Test
+ public void testRetriedReplaceWithOldValue() {
+ doRetriedTest(OpType.REPLACE_WITH_OLDVALUE);
+ }
+
+ @Test
+ public void testRetriedRemove() {
+ doRetriedTest(OpType.REMOVE);
+ }
+
+ private void doRetriedTest(final OpType opType) {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm3 = host.getVM(3);
+
+ int port0 = createServerRegion(vm0, RegionShortcut.REPLICATE);
+ int port1 = createServerRegion(vm1, RegionShortcut.REPLICATE);
+ createClientRegion(vm3, port0, port1);
+
+ vm0.invoke(new SerializableRunnable() {
+
+ @Override
+ public void run() {
+ Region region = getCache().getRegion("region");
+ if (opType != OpType.CREATE && opType != OpType.PUT_IF_ABSENT) {
+ region.put(0, "value");
+ }
+
+ // Add a listener to close vm0 when we send a distributed operation
+ // this will cause a retry after we have applied the original put all to
+ // the cache, causing a retry
+ DistributionMessageObserver
+ .setInstance(new DistributionMessageObserver() {
+
+ @Override
+ public void beforeSendMessage(ClusterDistributionManager dm,
+ DistributionMessage message) {
+ if (message instanceof DistributedCacheOperation.CacheOperationMessage) {
+ DistributedCacheOperation.CacheOperationMessage com =
+ (DistributedCacheOperation.CacheOperationMessage) message;
+ VersionTag tag = com.getVersionTag();
+ if (((opType == OpType.CREATE || opType == OpType.PUT_IF_ABSENT)
+ && tag.getEntryVersion() == 1) || tag.getEntryVersion() == 2) {
+ DistributionMessageObserver.setInstance(null);
+ disconnectFromDS(vm0);
+ }
+ }
+ }
+ });
+
+ }
+ });
+
+ // this put operation will trigger vm1 to be closed, and the put will be retried
+ vm3.invoke(new SerializableCallable("perform update in client") {
+ @Override
+ public Object call() throws Exception {
+ Region region = getCache().getRegion("region");
+ switch (opType) {
+ case CREATE:
+ region.create(0, "newvalue");
+ break;
+ case PUT:
+ region.put(0, "newvalue");
+ break;
+ case DESTROY:
+ region.destroy(0);
+ break;
+ case INVALIDATE:
+ region.invalidate(0);
+ break;
+ case PUT_IF_ABSENT:
+ region.putIfAbsent(0, "newvalue");
+ break;
+ case REPLACE:
+ region.replace(0, "newvalue");
+ break;
+ case REPLACE_WITH_OLDVALUE:
+ region.replace(0, "value", "newvalue");
+ break;
+ case REMOVE:
+ region.remove(0, "value");
+ break;
+ }
+ return null;
+ }
+ });
+
+ // Verify the observer was triggered
+ vm1.invoke(new SerializableRunnable() {
+
+ @Override
+ public void run() {
+ // if the observer was triggered, it would have cleared itself
+ assertNull(DistributionMessageObserver.getInstance());
+ VersionTag tag = ((LocalRegion) getCache().getRegion("region"))
+ .getVersionTag(0);
+ if (opType == OpType.CREATE || opType == OpType.PUT_IF_ABSENT) {
+ assertEquals(1, tag.getRegionVersion());
+ } else {
+ assertEquals(2, tag.getRegionVersion());
+ }
+ }
+ });
+
+ // Make sure vm0 did in fact shut down
+ vm0.invoke(new SerializableRunnable() {
+ @Override
+ public void run() {
+ GemFireCacheImpl cache = (GemFireCacheImpl) basicGetCache();
+ assertTrue(cache == null || cache.isClosed());
+ }
+ });
+ }
/**
* Test that we can successfully retry a distributed put all and get the version information. bug
@@ -202,7 +361,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
final VM vm2 = host.getVM(2);
final VM vm3 = host.getVM(3);
-
createServerRegion(vm0, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
vm0.invoke(new SerializableRunnable() {
@@ -215,27 +373,28 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
// Add a listener to close vm1 when we send a distributed put all operation
// this will cause a retry after we have applied the original put all to
// the cache, causing a retry
- DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
-
- @Override
- public void beforeSendMessage(ClusterDistributionManager dm,
- DistributionMessage message) {
- if (message instanceof DistributedPutAllOperation.PutAllMessage) {
- DistributionMessageObserver.setInstance(null);
- disconnectFromDS(vm1);
- }
- }
- });
+ DistributionMessageObserver
+ .setInstance(new DistributionMessageObserver() {
+
+ @Override
+ public void beforeSendMessage(ClusterDistributionManager dm,
+ DistributionMessage message) {
+ if (message instanceof DistributedPutAllOperation.PutAllMessage) {
+ DistributionMessageObserver.setInstance(null);
+ disconnectFromDS(vm1);
+ }
+ }
+ });
}
});
- int port1 = createServerRegion(vm1, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
- int port2 = createServerRegion(vm2, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
+ int port1 = createServerRegion(vm1,
+ RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
+ int port2 = createServerRegion(vm2,
+ RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
createClientRegion(vm3, port1, port2);
-
-
// This will be a put all to bucket 0
// Here's the expected sequence
// client->vm1 (accessor0)
@@ -310,31 +469,37 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
LogWriterUtils.getLogWriter().info("creating region in vm3");
createRegionInPeer(vm3, RegionShortcut.PARTITION_PROXY);
- expectedExceptions.add(IgnoredException.addIgnoredException("RuntimeException", vm2));
- vm2.invoke(new SerializableRunnable("install message listener to ignore update") {
- @Override
- public void run() {
- // Add a listener to close vm2 when we send a distributed put all operation
- // this will cause a retry after we have applied the original put all to
- // the cache, causing a retry
- DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
-
+ expectedExceptions
+ .add(IgnoredException.addIgnoredException("RuntimeException", vm2));
+ vm2.invoke(
+ new SerializableRunnable("install message listener to ignore update") {
@Override
- public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage msg) {
- if (msg instanceof DistributedPutAllOperation.PutAllMessage) {
- DistributionMessageObserver.setInstance(null);
- Wait.pause(5000); // give vm1 time to process the message that we're ignoring
- disconnectFromDS(vm0);
- // no reply will be sent to vm0 due to this exception, but that's okay
- // because vm0 has been shut down
- throw new RuntimeException("test code is ignoring message: " + msg);
- }
+ public void run() {
+ // Add a listener to close vm2 when we send a distributed put all operation
+ // this will cause a retry after we have applied the original put all to
+ // the cache, causing a retry
+ DistributionMessageObserver
+ .setInstance(new DistributionMessageObserver() {
+
+ @Override
+ public void beforeProcessMessage(ClusterDistributionManager dm,
+ DistributionMessage msg) {
+ if (msg instanceof DistributedPutAllOperation.PutAllMessage) {
+ DistributionMessageObserver.setInstance(null);
+ Wait.pause(
+ 5000); // give vm1 time to process the message that we're ignoring
+ disconnectFromDS(vm0);
+ // no reply will be sent to vm0 due to this exception, but that's okay
+ // because vm0 has been shut down
+ throw new RuntimeException(
+ "test code is ignoring message: " + msg);
+ }
+ }
+ });
+
}
});
- }
- });
-
// This will be a put all to bucket 0
// Here's the expected sequence
// accessor->vm0 (primary)
@@ -354,7 +519,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
}
});
-
// verify that the version is correct
vm1.invoke(new SerializableRunnable("verify vm1") {
@@ -369,7 +533,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
}
});
-
// Verify the observer was triggered and the version is correct
vm2.invoke(new SerializableRunnable("verify vm2") {
@@ -404,13 +567,13 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
});
}
-
-
private int createServerRegion(VM vm, final RegionShortcut shortcut) {
- SerializableCallable createRegion = new SerializableCallable("create server region") {
+ SerializableCallable createRegion = new SerializableCallable(
+ "create server region") {
@Override
public Object call() throws Exception {
- RegionFactory<Object, Object> rf = getCache().createRegionFactory(shortcut);
+ RegionFactory<Object, Object> rf = getCache()
+ .createRegionFactory(shortcut);
if (!shortcut.equals(RegionShortcut.REPLICATE)) {
rf.setPartitionAttributes(
new PartitionAttributesFactory().setRedundantCopies(2).create());
@@ -429,10 +592,12 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
}
private void createRegionInPeer(VM vm, final RegionShortcut shortcut) {
- SerializableCallable createRegion = new SerializableCallable("create peer region") {
+ SerializableCallable createRegion = new SerializableCallable(
+ "create peer region") {
@Override
public Object call() throws Exception {
- RegionFactory<Object, Object> rf = getCache().createRegionFactory(shortcut);
+ RegionFactory<Object, Object> rf = getCache()
+ .createRegionFactory(shortcut);
if (!shortcut.equals(RegionShortcut.REPLICATE)) {
rf.setPartitionAttributes(
new PartitionAttributesFactory().setRedundantCopies(2).create());
@@ -451,7 +616,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
return p;
}
- private int createServerRegionWithPersistence(VM vm, final boolean persistentPdxRegistry) {
+ private int createServerRegionWithPersistence(VM vm,
+ final boolean persistentPdxRegistry) {
SerializableCallable createRegion = new SerializableCallable() {
@Override
public Object call() throws Exception {
@@ -461,7 +627,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
}
//
Cache cache = getCache(cf);
- cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("store");
+ cache.createDiskStoreFactory().setDiskDirs(getDiskDirs())
+ .create("store");
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_ACK);
@@ -500,17 +667,25 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase {
return (Integer) vm.invoke(createRegion);
}
- private void createClientRegion(final VM vm, final int port1, final int port2) {
- SerializableCallable createRegion = new SerializableCallable("create client region in " + vm) {
+ private void createClientRegion(final VM vm, final int port1,
+ final int port2) {
+ SerializableCallable createRegion = new SerializableCallable(
+ "create client region in " + vm) {
@Override
public Object call() throws Exception {
ClientCacheFactory cf = new ClientCacheFactory();
+
cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port1);
cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port2);
cf.setPoolPRSingleHopEnabled(false);
cf.setPoolReadTimeout(10 * 60 * 1000);
+ cf.setPoolSubscriptionEnabled(true);
+
ClientCache cache = getClientCache(cf);
- cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
+ Region region =
+ cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+ .create("region");
+ region.registerInterest("ALL_KEYS");
return null;
}
};
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 3a70b9f..c958d7c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -3101,7 +3101,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return result;
}
- private void cacheWriteBeforeRegionClear(RegionEventImpl event)
+ protected void cacheWriteBeforeRegionClear(RegionEventImpl event)
throws CacheWriterException, TimeoutException {
// copy into local var to prevent race condition
CacheWriter writer = basicGetWriter();
@@ -5428,6 +5428,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
event.setContext(memberId);
// if this is a replayed or WAN operation we may already have a version tag
event.setVersionTag(clientEvent.getVersionTag());
+ event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
try {
basicDestroy(event, true, null);
} catch (ConcurrentCacheModificationException ignore) {
@@ -5459,6 +5460,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
// if this is a replayed operation we may already have a version tag
event.setVersionTag(clientEvent.getVersionTag());
+ event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
try {
basicInvalidate(event);
@@ -5485,6 +5487,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
// if this is a replayed operation we may already have a version tag
event.setVersionTag(clientEvent.getVersionTag());
+ event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
try {
basicUpdateEntryVersion(event);
@@ -8429,6 +8432,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
RegionVersionVector myVector = getVersionVector();
+ System.out.println(Thread.currentThread().getName() + ": LocalRegion.clearRegionLocally region="
+ + getName() + "; myVector=" + myVector);
if (myVector != null) {
if (isRvvDebugEnabled) {
logger.trace(LogMarker.RVV_VERBOSE, "processing version information for {}", regionEvent);
@@ -10764,6 +10769,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
try {
event.setContext(memberId);
+ event.setVersionTag(clientEvent.getVersionTag());
+ event.setPossibleDuplicate(clientEvent.isPossibleDuplicate());
// we rely on exceptions to tell us that the operation didn't take
// place. AbstractRegionMap performs the checks and throws the exception
try {