You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kh...@apache.org on 2016/12/01 19:04:46 UTC
[1/2] incubator-geode git commit: GEODE-1740: Apply spotless check
Repository: incubator-geode
Updated Branches:
refs/heads/develop f0cdb66ae -> 70479204e
GEODE-1740: Apply spotless check
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/70479204
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/70479204
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/70479204
Branch: refs/heads/develop
Commit: 70479204eb06f270906b07d82fdeb369de5a40b5
Parents: 7fa2c08
Author: Scott Jewell <sj...@pivotal.io>
Authored: Fri Nov 11 16:21:38 2016 -0800
Committer: Kenneth Howe <kh...@apache.org>
Committed: Thu Dec 1 11:03:47 2016 -0800
----------------------------------------------------------------------
.../internal/cache/ClearTXLockingDUnitTest.java | 96 +++++++++++---------
1 file changed, 51 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/70479204/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
index b620383..615ed97 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
@@ -54,9 +54,10 @@ import org.junit.experimental.categories.Category;
* operation.
*
* GEODE-1740: It was observed that operations performed within a transaction were not holding
- * region modification locks for the duration of commit processing. This lock is used to ensure region
- * consistency during CLEAR processing. By not holding the lock for the duration of commit processing,
- * a window was opened that allowed region operations such as clear to occur in mid-commit.
+ * region modification locks for the duration of commit processing. This lock is used to ensure
+ * region consistency during CLEAR processing. By not holding the lock for the duration of commit
+ * processing, a window was opened that allowed region operations such as clear to occur in
+ * mid-commit.
*
* The fix for GEODE-1740 was to acquire and hold read locks for any region involved in the commit.
* This forces CLEAR to wait until commit processing is complete.
@@ -68,15 +69,15 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
@Rule
public transient JUnitSoftAssertions softly = new JUnitSoftAssertions();
/*
- * This test performs operations within a transaction and during commit processing
- * schedules a clear to be performed on the relevant region. The scheduled clear should wait until
- * commit processing is complete before clearing the region. Failure to do so, would result in
- * region inconsistencies.
+ * This test performs operations within a transaction and during commit processing schedules a
+ * clear to be performed on the relevant region. The scheduled clear should wait until commit
+ * processing is complete before clearing the region. Failure to do so, would result in region
+ * inconsistencies.
*/
VM vm0, vm1, opsVM, regionVM;
static Cache cache;
-
+
ArmLockHook theArmHook;
DistributedMember vm0ID, vm1ID;
@@ -113,10 +114,10 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
setClearHook(REGION_NAME1, opsVM, regionVM);
performTestAndCheckResults(putOperationsTest);
}
-
+
/*
- * The CLOSE tests are ignored until the close operation has been
- * updated to acquire a write lock during processing.
+ * The CLOSE tests are ignored until the close operation has been updated to acquire a write lock
+ * during processing.
*/
@Ignore
@Test
@@ -135,10 +136,10 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
setCloseHook(REGION_NAME1, opsVM, regionVM);
performTestAndCheckResults(putOperationsTest);
}
-
+
/*
- * The DESTROY_REGION tests are ignored until the destroy operation has been
- * updated to acquire a write lock during processing.
+ * The DESTROY_REGION tests are ignored until the destroy operation has been updated to acquire a
+ * write lock during processing.
*/
@Ignore
@Test
@@ -157,13 +158,14 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
setDestroyRegionHook(REGION_NAME1, opsVM, regionVM);
performTestAndCheckResults(putOperationsTest);
}
-
+
// Local methods
/*
* This method executes a runnable test and then checks for region consistency
*/
- private void performTestAndCheckResults(SerializableRunnable operationsTest) throws InterruptedException {
+ private void performTestAndCheckResults(SerializableRunnable operationsTest)
+ throws InterruptedException {
try {
runLockingTest(opsVM, operationsTest);
checkForConsistencyErrors(REGION_NAME1);
@@ -174,17 +176,17 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
}
/*
- * We will be using 2 vms. One for the transaction and one for the clear
+ * We will be using 2 vms. One for the transaction and one for the clear
*/
private void getVMs() {
Host host = Host.getHost(0);
vm0 = host.getVM(0);
vm1 = host.getVM(1);
}
-
+
/*
- * Set which vm will perform the transaction and which will perform the region operation
- * and create the regions on the vms
+ * Set which vm will perform the transaction and which will perform the region operation and
+ * create the regions on the vms
*/
private void setupRegions(VM opsTarget, VM regionTarget) {
opsVM = opsTarget;
@@ -205,7 +207,7 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
}
/*
- * Runnable used to invoke the actual test
+ * Runnable used to invoke the actual test
*/
SerializableRunnable putOperationsTest = new SerializableRunnable("perform PUT") {
@Override
@@ -215,8 +217,8 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
};
/*
- * Set arm hook to detect when region operation is attempting to acquire write lock
- * and stage the clear that will be released half way through commit processing.
+ * Set arm hook to detect when region operation is attempting to acquire write lock and stage the
+ * clear that will be released half way through commit processing.
*/
public void setClearHook(String rname, VM whereOps, VM whereClear) {
whereOps.invoke(() -> setArmHook(rname));
@@ -226,8 +228,8 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
// remote test methods
/*
- * Wait to be notified and then execute the clear.
- * Once the clear completes, notify waiter to perform region verification.
+ * Wait to be notified and then execute the clear. Once the clear completes, notify waiter to
+ * perform region verification.
*/
private static void stageClear(String rname, VM whereOps) throws InterruptedException {
regionOperationWait();
@@ -264,8 +266,8 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
}
/*
- * Set the abstract region map lock hook to detect
- * attempt to acquire write lock by region operation.
+ * Set the abstract region map lock hook to detect attempt to acquire write lock by region
+ * operation.
*/
public void setArmHook(String rname) {
LocalRegion r = (LocalRegion) cache.getRegion(rname);
@@ -288,11 +290,11 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
regionLatch = new CountDownLatch(1);
regionLatch.await();
}
-
+
/*
- * A simple transaction that will have a region operation execute during commit.
- * opsLatch is used to wait until region operation has been scheduled during commit
- * and verifyLatch is used to ensure commit and clear processing have both completed.
+ * A simple transaction that will have a region operation execute during commit. opsLatch is used
+ * to wait until region operation has been scheduled during commit and verifyLatch is used to
+ * ensure commit and clear processing have both completed.
*/
private static void doPuts(Cache cache, VM whereRegion) throws InterruptedException {
TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
@@ -301,8 +303,8 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
verifyLatch = new CountDownLatch(1);
txManager.begin();
- TXStateInterface txState = ((TXStateProxyImpl)txManager.getTXState()).getRealDeal(null,null);
- ((TXState)txState).setDuringApplyChanges(new CommitTestCallback(whereRegion));
+ TXStateInterface txState = ((TXStateProxyImpl) txManager.getTXState()).getRealDeal(null, null);
+ ((TXState) txState).setDuringApplyChanges(new CommitTestCallback(whereRegion));
Region region1 = cache.getRegion(REGION_NAME1);
Region region2 = cache.getRegion(REGION_NAME2);
@@ -384,34 +386,37 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
}
return result;
}
-
+
/*
- * Test callback called for each operation during commit processing.
- * Half way through commit processing, release the region operation.
+ * Test callback called for each operation during commit processing. Half way through commit
+ * processing, release the region operation.
*/
static class CommitTestCallback implements Runnable {
VM whereRegionOperation;
static int callCount;
/* entered twice for each put lap since there are 2 regions */
static int releasePoint = NUMBER_OF_PUTS;
-
+
public CommitTestCallback(VM whereRegion) {
whereRegionOperation = whereRegion;
callCount = 0;
}
-
+
public void run() {
callCount++;
- if(callCount==releasePoint) {
+ if (callCount == releasePoint) {
releaseRegionOperation(whereRegionOperation);
- try {opsLatch.await();} catch (InterruptedException e) {}
+ try {
+ opsLatch.await();
+ } catch (InterruptedException e) {
+ }
}
}
}
-
+
/*
- * The region operations attempt to acquire the write lock will hang while
- * commit processing is occurring. Before this occurs, resume commit processing.
+ * The region operations attempt to acquire the write lock will hang while commit processing is
+ * occurring. Before this occurs, resume commit processing.
*/
public class ArmLockHook extends ARMLockTestHookAdapter {
int txCalls = 0;
@@ -420,8 +425,9 @@ public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
@Override
public void beforeLock(LocalRegion owner, CacheEvent event) {
- if(event!=null) {
- if (event.getOperation().isClear() || event.getOperation().isRegionDestroy() || event.getOperation().isClose()) {
+ if (event != null) {
+ if (event.getOperation().isClear() || event.getOperation().isRegionDestroy()
+ || event.getOperation().isClose()) {
releaseOps();
}
}
[2/2] incubator-geode git commit: GEODE-1740: Correct potential
region inconsistencies with concurrent clear and transaction commit
Posted by kh...@apache.org.
GEODE-1740: Correct potential region inconsistencies with concurrent clear and transaction commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7fa2c08c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7fa2c08c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7fa2c08c
Branch: refs/heads/develop
Commit: 7fa2c08cf403139afa7a60a81392ca13034e490b
Parents: f0cdb66
Author: Scott Jewell <sj...@pivotal.io>
Authored: Wed Nov 2 15:59:35 2016 -0700
Committer: Kenneth Howe <kh...@apache.org>
Committed: Thu Dec 1 11:03:47 2016 -0800
----------------------------------------------------------------------
.../geode/internal/cache/AbstractRegionMap.java | 43 +-
.../apache/geode/internal/cache/RegionMap.java | 4 +
.../apache/geode/internal/cache/TXState.java | 79 +++-
.../internal/cache/ClearTXLockingDUnitTest.java | 431 +++++++++++++++++++
4 files changed, 505 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 96936eef..e3e87ea 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1551,7 +1551,6 @@ public abstract class AbstractRegionMap implements RegionMap {
final boolean isRegionReady = !inTokenMode;
final boolean hasRemoteOrigin = !((TXId) txId).getMemberId().equals(owner.getMyId());
boolean cbEventInPending = false;
- lockForTXCacheModification(owner, versionTag);
IndexManager oqlIndexManager = owner.getIndexManager();
try {
RegionEntry re = getEntry(key);
@@ -1818,8 +1817,6 @@ public abstract class AbstractRegionMap implements RegionMap {
} catch (DiskAccessException dae) {
owner.handleDiskAccessException(dae);
throw dae;
- } finally {
- releaseTXCacheModificationLock(owner, versionTag);
}
}
@@ -2353,7 +2350,6 @@ public abstract class AbstractRegionMap implements RegionMap {
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
- lockForTXCacheModification(owner, versionTag);
try {
if (forceNewEntry) {
boolean opCompleted = false;
@@ -2582,7 +2578,6 @@ public abstract class AbstractRegionMap implements RegionMap {
owner.handleDiskAccessException(dae);
throw dae;
} finally {
- releaseTXCacheModificationLock(owner, versionTag);
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
@@ -3115,7 +3110,6 @@ public abstract class AbstractRegionMap implements RegionMap {
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
- lockForTXCacheModification(owner, versionTag);
try {
if (hasRemoteOrigin && !isTXHost && !isClientTXOriginator) {
// If we are not a mirror then only apply the update to existing
@@ -3384,7 +3378,6 @@ public abstract class AbstractRegionMap implements RegionMap {
owner.handleDiskAccessException(dae);
throw dae;
} finally {
- releaseTXCacheModificationLock(owner, versionTag);
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
@@ -3693,40 +3686,32 @@ public abstract class AbstractRegionMap implements RegionMap {
}
- /** get version-generation permission from the region's version vector */
- private void lockForTXCacheModification(LocalRegion owner, VersionTag tag) {
-
+ @Override
+ public void lockRegionForAtomicTX(LocalRegion r) {
if (armLockTestHook != null)
- armLockTestHook.beforeLock(owner, null);
+ armLockTestHook.beforeLock(r, null);
- if (!(tag != null && tag.isFromOtherMember())) {
- RegionVersionVector vector = owner.getVersionVector();
- if (vector != null && !owner.hasServerProxy()) {
- vector.lockForCacheModification();
- }
+ RegionVersionVector vector = r.getVersionVector();
+ if (vector != null) {
+ vector.lockForCacheModification();
}
if (armLockTestHook != null)
- armLockTestHook.afterLock(owner, null);
-
+ armLockTestHook.afterLock(r, null);
}
- /** release version-generation permission from the region's version vector */
- private void releaseTXCacheModificationLock(LocalRegion owner, VersionTag tag) {
-
+ @Override
+ public void unlockRegionForAtomicTX(LocalRegion r) {
if (armLockTestHook != null)
- armLockTestHook.beforeRelease(owner, null);
+ armLockTestHook.beforeRelease(r, null);
- if (!(tag != null && tag.isFromOtherMember())) {
- RegionVersionVector vector = owner.getVersionVector();
- if (vector != null && !owner.hasServerProxy()) {
- vector.releaseCacheModificationLock();
- }
+ RegionVersionVector vector = r.getVersionVector();
+ if (vector != null) {
+ vector.releaseCacheModificationLock();
}
if (armLockTestHook != null)
- armLockTestHook.afterRelease(owner, null);
-
+ armLockTestHook.afterRelease(r, null);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
index ee8a84e..7ecabd7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
@@ -372,6 +372,10 @@ public interface RegionMap extends LRUMapCallbacks {
public void close();
+ default void lockRegionForAtomicTX(LocalRegion r) {}
+
+ default void unlockRegionForAtomicTX(LocalRegion r) {}
+
public ARMLockTestHook getARMLockTestHook();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 99a3b83..d577f39 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -103,6 +103,7 @@ public class TXState implements TXStateInterface {
// Internal testing hooks
private Runnable internalAfterReservation;
protected Runnable internalAfterConflictCheck;
+ protected Runnable internalDuringApplyChanges;
protected Runnable internalAfterApplyChanges;
protected Runnable internalAfterReleaseLocalLocks;
Runnable internalDuringIndividualSend; // package scope allows TXCommitMessage use
@@ -460,34 +461,38 @@ public class TXState implements TXStateInterface {
attachFilterProfileInformation(entries);
- // apply changes to the cache
- applyChanges(entries);
- // For internal testing
- if (this.internalAfterApplyChanges != null) {
- this.internalAfterApplyChanges.run();
- }
+ lockTXRegions(regions);
- // build and send the message
- msg = buildMessage();
- this.commitMessage = msg;
- if (this.internalBeforeSend != null) {
- this.internalBeforeSend.run();
- }
+ try {
+ // apply changes to the cache
+ applyChanges(entries);
+ // For internal testing
+ if (this.internalAfterApplyChanges != null) {
+ this.internalAfterApplyChanges.run();
+ }
+ // build and send the message
+ msg = buildMessage();
+ this.commitMessage = msg;
+ if (this.internalBeforeSend != null) {
+ this.internalBeforeSend.run();
+ }
+ msg.send(this.locks.getDistributedLockId());
+ // For internal testing
+ if (this.internalAfterSend != null) {
+ this.internalAfterSend.run();
+ }
- msg.send(this.locks.getDistributedLockId());
- // For internal testing
- if (this.internalAfterSend != null) {
- this.internalAfterSend.run();
+ firePendingCallbacks();
+ /*
+ * This is to prepare the commit message for the caller, make sure all events are in
+ * there.
+ */
+ this.commitMessage = buildCompleteMessage();
+ } finally {
+ unlockTXRegions(regions);
}
-
- firePendingCallbacks();
- /*
- * This is to prepare the commit message for the caller, make sure all events are in there.
- */
- this.commitMessage = buildCompleteMessage();
-
} finally {
if (msg != null) {
msg.releaseViewVersions();
@@ -503,6 +508,24 @@ public class TXState implements TXStateInterface {
}
}
+ private void lockTXRegions(IdentityHashMap<LocalRegion, TXRegionState> regions) {
+ Iterator<Map.Entry<LocalRegion, TXRegionState>> it = regions.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<LocalRegion, TXRegionState> me = it.next();
+ LocalRegion r = me.getKey();
+ r.getRegionMap().lockRegionForAtomicTX(r);
+ }
+ }
+
+ private void unlockTXRegions(IdentityHashMap<LocalRegion, TXRegionState> regions) {
+ Iterator<Map.Entry<LocalRegion, TXRegionState>> it = regions.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<LocalRegion, TXRegionState> me = it.next();
+ LocalRegion r = me.getKey();
+ r.getRegionMap().unlockRegionForAtomicTX(r);
+ }
+ }
+
protected void attachFilterProfileInformation(List entries) {
{
Iterator/* <TXEntryStateWithRegionAndKey> */ it = entries.iterator();
@@ -769,6 +792,9 @@ public class TXState implements TXStateInterface {
Iterator/* <TXEntryStateWithRegionAndKey> */ it = entries.iterator();
while (it.hasNext()) {
TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey) it.next();
+ if (this.internalDuringApplyChanges != null) {
+ this.internalDuringApplyChanges.run();
+ }
try {
o.es.applyChanges(o.r, o.key, this);
} catch (RegionDestroyedException ex) {
@@ -1073,6 +1099,13 @@ public class TXState implements TXStateInterface {
}
/**
+ * Add an internal callback which is run as each transaction change is applied.
+ */
+ public void setDuringApplyChanges(Runnable duringApplyChanges) {
+ this.internalDuringApplyChanges = duringApplyChanges;
+ }
+
+ /**
* Add an internal callback which is run after the transaction changes have been applied to
* committed state (locally) but before local locks are released (occurs for regions of Local and
* Distributed No Ack scope).
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
new file mode 100644
index 0000000..b620383
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+/*
+ * ClearRvvLockingDUnitTest.java
+ *
+ * Created on September 6, 2005, 2:57 PM
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.SerializableCallable;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.logging.log4j.Logger;
+import org.assertj.core.api.JUnitSoftAssertions;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test class to verify proper locking interaction between transactions and the CLEAR region
+ * operation.
+ *
+ * GEODE-1740: It was observed that operations performed within a transaction were not holding
+ * region modification locks for the duration of commit processing. This lock is used to ensure region
+ * consistency during CLEAR processing. By not holding the lock for the duration of commit processing,
+ * a window was opened that allowed region operations such as clear to occur in mid-commit.
+ *
+ * The fix for GEODE-1740 was to acquire and hold read locks for any region involved in the commit.
+ * This forces CLEAR to wait until commit processing is complete.
+ */
+@SuppressWarnings("serial")
+@Category(DistributedTest.class)
+public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
+
+ @Rule
+ public transient JUnitSoftAssertions softly = new JUnitSoftAssertions();
+ /*
+ * This test performs operations within a transaction and during commit processing
+ * schedules a clear to be performed on the relevant region. The scheduled clear should wait until
+ * commit processing is complete before clearing the region. Failure to do so, would result in
+ * region inconsistencies.
+ */
+ VM vm0, vm1, opsVM, regionVM;
+
+ static Cache cache;
+
+ ArmLockHook theArmHook;
+
+ DistributedMember vm0ID, vm1ID;
+
+ static CacheTransactionManager txmgr;
+
+ static final String THE_KEY = "theKey";
+ static final String THE_VALUE = "theValue";
+ static final int NUMBER_OF_PUTS = 2;
+
+ static final String REGION_NAME1 = "testRegion1";
+ static final String REGION_NAME2 = "testRegion2";
+
+ static CountDownLatch opsLatch;
+ static CountDownLatch regionLatch;
+ static CountDownLatch verifyLatch;
+
+ private static final Logger logger = LogService.getLogger();
+
+ // test methods
+
+ @Test
+ public void testPutWithClearSameVM() throws InterruptedException {
+ getVMs();
+ setupRegions(vm0, vm0);
+ setClearHook(REGION_NAME1, opsVM, regionVM);
+ performTestAndCheckResults(putOperationsTest);
+ }
+
+ @Test
+ public void testPutWithClearDifferentVM() throws InterruptedException {
+ getVMs();
+ setupRegions(vm0, vm1);
+ setClearHook(REGION_NAME1, opsVM, regionVM);
+ performTestAndCheckResults(putOperationsTest);
+ }
+
+ /*
+ * The CLOSE tests are ignored until the close operation has been
+ * updated to acquire a write lock during processing.
+ */
+ @Ignore
+ @Test
+ public void testPutWithCloseSameVM() throws InterruptedException {
+ getVMs();
+ setupRegions(vm0, vm0);
+ setCloseHook(REGION_NAME1, opsVM, regionVM);
+ performTestAndCheckResults(putOperationsTest);
+ }
+
+ @Ignore
+ @Test
+ public void testPutWithCloseDifferentVM() throws InterruptedException {
+ getVMs();
+ setupRegions(vm0, vm1);
+ setCloseHook(REGION_NAME1, opsVM, regionVM);
+ performTestAndCheckResults(putOperationsTest);
+ }
+
+ /*
+ * The DESTROY_REGION tests are ignored until the destroy operation has been
+ * updated to acquire a write lock during processing.
+ */
+ @Ignore
+ @Test
+ public void testPutWithDestroyRegionSameVM() throws InterruptedException {
+ getVMs();
+ setupRegions(vm0, vm0);
+ setDestroyRegionHook(REGION_NAME1, opsVM, regionVM);
+ performTestAndCheckResults(putOperationsTest);
+ }
+
+ @Ignore
+ @Test
+ public void testPutWithDestroyRegionDifferentVM() throws InterruptedException {
+ getVMs();
+ setupRegions(vm0, vm1);
+ setDestroyRegionHook(REGION_NAME1, opsVM, regionVM);
+ performTestAndCheckResults(putOperationsTest);
+ }
+
+ // Local methods
+
+ /*
+ * This method executes a runnable test and then checks for region consistency
+ */
+ private void performTestAndCheckResults(SerializableRunnable operationsTest) throws InterruptedException {
+ try {
+ runLockingTest(opsVM, operationsTest);
+ checkForConsistencyErrors(REGION_NAME1);
+ checkForConsistencyErrors(REGION_NAME2);
+ } finally {
+ opsVM.invoke(() -> resetArmHook(REGION_NAME1));
+ }
+ }
+
+ /*
+ * We will be using 2 vms. One for the transaction and one for the clear
+ */
+ private void getVMs() {
+ Host host = Host.getHost(0);
+ vm0 = host.getVM(0);
+ vm1 = host.getVM(1);
+ }
+
+ /*
+ * Set which vm will perform the transaction and which will perform the region operation
+ * and create the regions on the vms
+ */
+ private void setupRegions(VM opsTarget, VM regionTarget) {
+ opsVM = opsTarget;
+ regionVM = regionTarget;
+ vm0ID = createCache(vm0);
+ vm1ID = createCache(vm1);
+ vm0.invoke(() -> createRegion(REGION_NAME1));
+ vm0.invoke(() -> createRegion(REGION_NAME2));
+ vm1.invoke(() -> createRegion(REGION_NAME1));
+ vm1.invoke(() -> createRegion(REGION_NAME2));
+ }
+
+ /*
+ * Invoke a runnable on the operations vm
+ */
+ private void runLockingTest(VM vm, SerializableRunnableIF theTest) {
+ vm.invoke(theTest);
+ }
+
+ /*
+ * Runnable used to invoke the actual test
+ */
+ SerializableRunnable putOperationsTest = new SerializableRunnable("perform PUT") {
+ @Override
+ public void run() {
+ opsVM.invoke(() -> doPuts(getCache(), regionVM));
+ }
+ };
+
+ /*
+ * Set arm hook to detect when region operation is attempting to acquire write lock
+ * and stage the clear that will be released half way through commit processing.
+ */
+ public void setClearHook(String rname, VM whereOps, VM whereClear) {
+ whereOps.invoke(() -> setArmHook(rname));
+ whereClear.invokeAsync(() -> stageClear(rname, whereOps));
+ }
+
+ // remote test methods
+
+ /*
+ * Wait to be notified and then execute the clear.
+ * Once the clear completes, notify waiter to perform region verification.
+ */
+ private static void stageClear(String rname, VM whereOps) throws InterruptedException {
+ regionOperationWait();
+ LocalRegion r = (LocalRegion) cache.getRegion(rname);
+ r.clear();
+ whereOps.invoke(() -> releaseVerify());
+ }
+
+ /*
+ * Set and stage method for close and destroy are the same as clear
+ */
+ public void setCloseHook(String rname, VM whereOps, VM whereClear) {
+ whereOps.invoke(() -> setArmHook(rname));
+ whereClear.invokeAsync(() -> stageClose(rname, whereOps));
+ }
+
+ private static void stageClose(String rname, VM whereOps) throws InterruptedException {
+ regionOperationWait();
+ LocalRegion r = (LocalRegion) cache.getRegion(rname);
+ r.close();
+ whereOps.invoke(() -> releaseVerify());
+ }
+
+ public void setDestroyRegionHook(String rname, VM whereOps, VM whereClear) {
+ whereOps.invoke(() -> setArmHook(rname));
+ whereClear.invokeAsync(() -> stageDestroyRegion(rname, whereOps));
+ }
+
+ private static void stageDestroyRegion(String rname, VM whereOps) throws InterruptedException {
+ regionOperationWait();
+ LocalRegion r = (LocalRegion) cache.getRegion(rname);
+ r.destroyRegion();
+ whereOps.invoke(() -> releaseVerify());
+ }
+
+ /*
+ * Set the abstract region map lock hook to detect
+ * attempt to acquire write lock by region operation.
+ */
+ public void setArmHook(String rname) {
+ LocalRegion r = (LocalRegion) cache.getRegion(rname);
+ theArmHook = new ArmLockHook();
+ ((AbstractRegionMap) r.entries).setARMLockTestHook(theArmHook);
+ }
+
+ /*
+ * Cleanup arm lock hook by setting it null
+ */
+ public void resetArmHook(String rname) {
+ LocalRegion r = (LocalRegion) cache.getRegion(rname);
+ ((AbstractRegionMap) r.entries).setARMLockTestHook(null);
+ }
+
+ /*
+ * Wait to be notified it is time to perform region operation (i.e. CLEAR)
+ */
+ private static void regionOperationWait() throws InterruptedException {
+ regionLatch = new CountDownLatch(1);
+ regionLatch.await();
+ }
+
+ /*
+ * A simple transaction that will have a region operation execute during commit.
+ * opsLatch is used to wait until region operation has been scheduled during commit
+ * and verifyLatch is used to ensure commit and clear processing have both completed.
+ */
+ private static void doPuts(Cache cache, VM whereRegion) throws InterruptedException {
+ TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+
+ opsLatch = new CountDownLatch(1);
+ verifyLatch = new CountDownLatch(1);
+
+ txManager.begin();
+ TXStateInterface txState = ((TXStateProxyImpl)txManager.getTXState()).getRealDeal(null,null);
+ ((TXState)txState).setDuringApplyChanges(new CommitTestCallback(whereRegion));
+
+ Region region1 = cache.getRegion(REGION_NAME1);
+ Region region2 = cache.getRegion(REGION_NAME2);
+ for (int i = 0; i < NUMBER_OF_PUTS; i++) {
+ region1.put(REGION_NAME1 + THE_KEY + i, THE_VALUE + i);
+ region2.put(REGION_NAME2 + THE_KEY + i, THE_VALUE + i);
+ }
+
+ txManager.commit();
+ verifyLatch.await();
+ }
+
+ /*
+ * Release the region operation that has been previously staged
+ */
+ private static void releaseRegionOperation(VM whereRegion) {
+ whereRegion.invoke(() -> regionLatch.countDown());
+ }
+
+ /*
+ * Region operation has been scheduled, now resume commit processing
+ */
+ private static void releaseOps() {
+ opsLatch.countDown();
+ }
+
+ /*
+ * Notify waiter it is time to verify region contents
+ */
+ private static void releaseVerify() {
+ verifyLatch.countDown();
+ }
+
+ private InternalDistributedMember createCache(VM vm) {
+ return (InternalDistributedMember) vm.invoke(new SerializableCallable<Object>() {
+ public Object call() {
+ cache = getCache(new CacheFactory().set("conserve-sockets", "true"));
+ return getSystem().getDistributedMember();
+ }
+ });
+ }
+
+ private static void createRegion(String rgnName) {
+ RegionFactory<Object, Object> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+ rf.setConcurrencyChecksEnabled(true);
+ rf.setScope(Scope.DISTRIBUTED_ACK);
+ rf.create(rgnName);
+ }
+
+ /*
+ * Get region contents from each member and verify they are consistent
+ */
+ private void checkForConsistencyErrors(String rname) {
+ Map<Object, Object> r0Contents =
+ (Map<Object, Object>) vm0.invoke(() -> getRegionContents(rname));
+ Map<Object, Object> r1Contents =
+ (Map<Object, Object>) vm1.invoke(() -> getRegionContents(rname));
+
+ for (int i = 0; i < NUMBER_OF_PUTS; i++) {
+ String theKey = rname + THE_KEY + i;
+ if (r0Contents.containsKey(theKey)) {
+ softly.assertThat(r1Contents.get(theKey))
+ .as("region contents are not consistent for key %s", theKey)
+ .isEqualTo(r0Contents.get(theKey));
+ } else {
+ softly.assertThat(r1Contents).as("expected containsKey for %s to return false", theKey)
+ .doesNotContainKey(theKey);
+ }
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static Map<Object, Object> getRegionContents(String rname) {
+ LocalRegion r = (LocalRegion) cache.getRegion(rname);
+ Map<Object, Object> result = new HashMap<>();
+ for (Iterator i = r.entrySet().iterator(); i.hasNext();) {
+ Region.Entry e = (Region.Entry) i.next();
+ result.put(e.getKey(), e.getValue());
+ }
+ return result;
+ }
+
+ /*
+ * Test callback called for each operation during commit processing.
+ * Half way through commit processing, release the region operation.
+ */
+ static class CommitTestCallback implements Runnable {
+ VM whereRegionOperation;
+ static int callCount;
+ /* entered twice for each put lap since there are 2 regions */
+ static int releasePoint = NUMBER_OF_PUTS;
+
+ public CommitTestCallback(VM whereRegion) {
+ whereRegionOperation = whereRegion;
+ callCount = 0;
+ }
+
+ public void run() {
+ callCount++;
+ if(callCount==releasePoint) {
+ releaseRegionOperation(whereRegionOperation);
+ try {opsLatch.await();} catch (InterruptedException e) {}
+ }
+ }
+ }
+
+ /*
+ * The region operations attempt to acquire the write lock will hang while
+ * commit processing is occurring. Before this occurs, resume commit processing.
+ */
+ public class ArmLockHook extends ARMLockTestHookAdapter {
+ int txCalls = 0;
+ int releasePoint = NUMBER_OF_PUTS / 2;
+ CountDownLatch putLatch = new CountDownLatch(1);
+
+ @Override
+ public void beforeLock(LocalRegion owner, CacheEvent event) {
+ if(event!=null) {
+ if (event.getOperation().isClear() || event.getOperation().isRegionDestroy() || event.getOperation().isClose()) {
+ releaseOps();
+ }
+ }
+ }
+ }
+
+}