You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2017/10/19 15:09:42 UTC
[geode] branch develop updated: GEODE-3521: Allow region set
operations to bootstrap a transaction. (#891)
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 5a1108f GEODE-3521: Allow region set operations to bootstrap a transaction. (#891)
5a1108f is described below
commit 5a1108f0cd911e317b25613fb436531d57c55fe0
Author: pivotal-eshu <es...@pivotal.io>
AuthorDate: Thu Oct 19 08:09:38 2017 -0700
GEODE-3521: Allow region set operations to bootstrap a transaction. (#891)
* GEODE-3521: Allow region set operations to bootstrap a transaction.
* Now Geode will bootstrap a transaction on a region set op even though
it is a first operation in the transaction.
* Add a SystemPropertyHelper class to held system properties to be set.
* User needs to set a system property to disable this new behavior.
* Do not allow an accessor to bootstrap a transaction.
* Add a PausedTXStateProxyImpl state to allow product to know if a transaction is paused.
*Product now detects if only same thread can unpause a paused transaction.
* Do not bootstrap a new transaction again when transaction is paused.
* Add a test case for testing pause and unpause.
* A thread can not pause a transaction and start a new transaction - use suspend and resume instead.
* Handle a situation that RegionEntry can be set in the KeyInfo when iterate through keys.
---
.../geode/cache/query/internal/DefaultQuery.java | 4 +-
.../cache/query/internal/index/IndexManager.java | 10 +-
.../geode/internal/cache/DistributedRegion.java | 4 +-
.../geode/internal/cache/GemFireCacheImpl.java | 4 +-
.../apache/geode/internal/cache/LocalRegion.java | 38 +-
.../geode/internal/cache/PartitionedRegion.java | 14 +
.../internal/cache/PartitionedRegionDataStore.java | 1 -
.../internal/cache/PartitionedRegionDataView.java | 8 +-
.../internal/cache/PausedTXStateProxyImpl.java | 422 +++++++++++++++++++++
.../apache/geode/internal/cache/TXManagerImpl.java | 111 +++++-
.../org/apache/geode/internal/cache/TXState.java | 4 +-
.../apache/geode/internal/cache/TXStateProxy.java | 6 +-
.../geode/internal/cache/TXStateProxyImpl.java | 55 ++-
.../apache/geode/internal/cache/TXStateStub.java | 7 +-
.../geode/internal/lang/SystemPropertyHelper.java | 57 +++
.../geode/pdx/internal/PeerTypeRegistration.java | 4 +-
.../org/apache/geode/SetOperationTXJUnitTest.java | 167 ++++++++
.../java/org/apache/geode/TXExpiryJUnitTest.java | 4 +-
.../test/java/org/apache/geode/TXJUnitTest.java | 219 ++++++-----
.../geode/cache30/MultiVMRegionTestCase.java | 12 +-
.../cache/ClientServerTransactionDUnitTest.java | 26 +-
.../internal/cache/RemoteTransactionDUnitTest.java | 100 ++---
.../cache/TransactionsWithDeltaDUnitTest.java | 4 +-
.../cache/execute/MyTransactionFunction.java | 18 +-
.../cache/execute/PRSetOperationJTADUnitTest.java | 256 +++++++++++++
.../cache/execute/PRSetOperationTXDUnitTest.java | 237 ++++++++++++
.../internal/jta/ClientServerJTADUnitTest.java | 8 +-
.../internal/jta/SetOperationJTAJUnitTest.java | 183 +++++++++
.../lang/SystemPropertyHelperJUnitTest.java | 56 +++
29 files changed, 1805 insertions(+), 234 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
index c242d89..01ecd8f 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
@@ -538,7 +538,7 @@ public class DefaultQuery implements Query {
QueryObserver observer = QueryObserverHolder.getInstance();
long startTime = CachePerfStats.getStatTime();
- TXStateProxy tx = ((TXManagerImpl) this.cache.getCacheTransactionManager()).internalSuspend();
+ TXStateProxy tx = ((TXManagerImpl) this.cache.getCacheTransactionManager()).pauseTransaction();
try {
observer.startQuery(this);
observer.beforeQueryEvaluation(this.compiledQuery, context);
@@ -575,7 +575,7 @@ public class DefaultQuery implements Query {
updateStatistics(endTime - startTime);
pdxClassToFieldsMap.remove();
pdxClassToMethodsMap.remove();
- ((TXManagerImpl) this.cache.getCacheTransactionManager()).internalResume(tx);
+ ((TXManagerImpl) this.cache.getCacheTransactionManager()).unpauseTransaction(tx);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
index 12e7fd0..c61a497 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
@@ -277,7 +277,7 @@ public class IndexManager {
TXStateProxy tx = null;
if (!((InternalCache) this.region.getCache()).isClient()) {
- tx = ((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).internalSuspend();
+ tx = ((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).pauseTransaction();
}
try {
@@ -431,7 +431,8 @@ public class IndexManager {
DefaultQuery.setPdxReadSerialized(this.region.getCache(), oldReadSerialized);
if (tx != null) {
- ((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).internalResume(tx);
+ ((TXManagerImpl) this.region.getCache().getCacheTransactionManager())
+ .unpauseTransaction(tx);
}
}
}
@@ -1005,7 +1006,7 @@ public class IndexManager {
DefaultQuery.setPdxReadSerialized(this.region.getCache(), true);
TXStateProxy tx = null;
if (!((InternalCache) this.region.getCache()).isClient()) {
- tx = ((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).internalSuspend();
+ tx = ((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).pauseTransaction();
}
try {
@@ -1149,7 +1150,8 @@ public class IndexManager {
} finally {
DefaultQuery.setPdxReadSerialized(this.region.getCache(), false);
if (tx != null) {
- ((TXManagerImpl) this.region.getCache().getCacheTransactionManager()).internalResume(tx);
+ ((TXManagerImpl) this.region.getCache().getCacheTransactionManager())
+ .unpauseTransaction(tx);
}
getCachePerfStats().endIndexUpdate(startPA);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 20d9f15..c76e813 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -3833,7 +3833,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
protected VersionTag fetchRemoteVersionTag(Object key) {
VersionTag tag = null;
assert this.dataPolicy != DataPolicy.REPLICATE;
- final TXStateProxy tx = cache.getTXMgr().internalSuspend();
+ final TXStateProxy tx = cache.getTXMgr().pauseTransaction();
try {
boolean retry = true;
InternalDistributedMember member = getRandomReplicate();
@@ -3856,7 +3856,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
} finally {
if (tx != null) {
- cache.getTXMgr().internalResume(tx);
+ cache.getTXMgr().unpauseTransaction(tx);
}
}
return tag;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index a399a7a..64efb10 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -2155,7 +2155,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
try {
if (this.transactionManager != null) {
- tx = this.transactionManager.internalSuspend();
+ tx = this.transactionManager.pauseTransaction();
}
// do this before closing regions
@@ -2394,7 +2394,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
((DynamicRegionFactoryImpl) DynamicRegionFactory.get()).close();
if (this.transactionManager != null) {
- this.transactionManager.internalResume(tx);
+ this.transactionManager.unpauseTransaction(tx);
}
TXCommitMessage.getTracker().clearForCacheClose();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 46c4e59..35a0cc3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -202,6 +202,7 @@ import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument;
import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.lang.SystemPropertyHelper;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
@@ -1836,6 +1837,9 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
public Set entrySet(boolean recursive) {
checkReadiness();
checkForNoAccess();
+ if (!restoreSetOperationTransactionBehavior) {
+ discoverJTA();
+ }
return basicEntries(recursive);
}
@@ -1858,6 +1862,9 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
public Set keys() {
checkReadiness();
checkForNoAccess();
+ if (!restoreSetOperationTransactionBehavior) {
+ discoverJTA();
+ }
return new EntriesSet(this, false, IteratorType.KEYS, false);
}
@@ -1877,6 +1884,9 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
public Collection values() {
checkReadiness();
checkForNoAccess();
+ if (!restoreSetOperationTransactionBehavior) {
+ discoverJTA();
+ }
return new EntriesSet(this, false, IteratorType.VALUES, false);
}
@@ -6133,7 +6143,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
boolean callbackEvents) throws CacheWriterException, TimeoutException {
preDestroyChecks();
- final TXStateProxy tx = this.cache.getTXMgr().internalSuspend();
+ final TXStateProxy tx = this.cache.getTXMgr().pauseTransaction();
try {
boolean acquiredLock = false;
if (lock) {
@@ -6248,7 +6258,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
}
} finally {
- this.cache.getTXMgr().internalResume(tx);
+ this.cache.getTXMgr().unpauseTransaction(tx);
}
}
@@ -6384,6 +6394,9 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
getDataView().destroyExistingEntry(event, cacheWrite, expectedOldValue);
}
+ protected final boolean restoreSetOperationTransactionBehavior =
+ SystemPropertyHelper.restoreSetOperationTransactionBehavior();
+
/**
* Do the expensive work of discovering an existing JTA transaction Only needs to be called at
* Region.Entry entry points e.g. Region.put, Region.invalidate, etc.
@@ -6397,6 +6410,11 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
}
}
+ private boolean isTransactionPaused() {
+ TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+ return txMgr.isTransactionPaused();
+ }
+
/**
* @return true if a transaction is in process
* @since GemFire tx
@@ -6796,7 +6814,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
}
void basicInvalidateRegion(RegionEventImpl event) {
- final TXStateProxy tx = this.cache.getTXMgr().internalSuspend();
+ final TXStateProxy tx = this.cache.getTXMgr().pauseTransaction();
try {
this.regionInvalid = true;
getImageState().setRegionInvalidated(true);
@@ -6840,7 +6858,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
}
} finally {
- this.cache.getTXMgr().internalResume(tx);
+ this.cache.getTXMgr().unpauseTransaction(tx);
}
}
@@ -8277,6 +8295,10 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
|| jtaTransaction.getStatus() == Status.STATUS_NO_TRANSACTION) {
return null;
}
+ if (isTransactionPaused()) {
+ // Do not bootstrap JTA again, if the transaction has been paused.
+ return null;
+ }
txState = this.cache.getTXMgr().beginJTA();
jtaTransaction.registerSynchronization(txState);
return txState;
@@ -9182,7 +9204,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
if (!alreadyInvalid(key, event)) {
// bug #47716 - don't update if it's already here & invalid
- TXStateProxy txState = this.cache.getTXMgr().internalSuspend();
+ TXStateProxy txState = this.cache.getTXMgr().pauseTransaction();
try {
basicPutEntry(event, 0L);
} catch (ConcurrentCacheModificationException e) {
@@ -9192,7 +9214,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
key, e);
}
} finally {
- this.cache.getTXMgr().internalResume(txState);
+ this.cache.getTXMgr().unpauseTransaction(txState);
}
getCachePerfStats().endPut(startPut, event.isOriginRemote());
}
@@ -11406,6 +11428,10 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
}
}
+ public boolean canStoreDataLocally() {
+ return this.dataPolicy.withStorage();
+ }
+
/**
* If the specified key is not already associated with a value, associate it with the given value.
* This is equivalent to
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 2713e2b..cc36617 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -4827,6 +4827,11 @@ public class PartitionedRegion extends LocalRegion
return this.dataStore;
}
+ @Override
+ public boolean canStoreDataLocally() {
+ return getDataStore() != null;
+ }
+
/**
* Grab the PartitionedRegionID Lock, this MUST be done in a try block since it may throw an
* exception
@@ -5886,6 +5891,9 @@ public class PartitionedRegion extends LocalRegion
@Override
public Set entrySet(boolean recursive) {
checkReadiness();
+ if (!restoreSetOperationTransactionBehavior) {
+ discoverJTA();
+ }
return Collections.unmodifiableSet(new PREntriesSet());
}
@@ -5949,6 +5957,9 @@ public class PartitionedRegion extends LocalRegion
@Override
public Set keys() {
checkReadiness();
+ if (!restoreSetOperationTransactionBehavior) {
+ discoverJTA();
+ }
return Collections.unmodifiableSet(new KeysSet());
}
@@ -6135,6 +6146,9 @@ public class PartitionedRegion extends LocalRegion
@Override
public Collection values() {
checkReadiness();
+ if (!restoreSetOperationTransactionBehavior) {
+ discoverJTA();
+ }
return Collections.unmodifiableSet(new ValuesSet());
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 91d230e..17421b3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -2212,7 +2212,6 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
invokeBucketReadHook();
try {
if (r != null) {
- Set keys = r.keySet(allowTombstones);
// A copy is made so that the bucket is free to move
ret = new HashSet(r.keySet(allowTombstones));
checkIfBucketMoved(r);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataView.java
index 998f944..489ad33 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataView.java
@@ -53,12 +53,12 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
@Override
public Entry getEntry(KeyInfo keyInfo, LocalRegion localRegion, boolean allowTombstones) {
- TXStateProxy tx = localRegion.cache.getTXMgr().internalSuspend();
+ TXStateProxy tx = localRegion.cache.getTXMgr().pauseTransaction();
try {
PartitionedRegion pr = (PartitionedRegion) localRegion;
return pr.nonTXGetEntry(keyInfo, false, allowTombstones);
} finally {
- localRegion.cache.getTXMgr().internalResume(tx);
+ localRegion.cache.getTXMgr().unpauseTransaction(tx);
}
}
@@ -67,12 +67,12 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
Object value, boolean disableCopyOnRead, boolean preferCD,
ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
boolean returnTombstones) {
- TXStateProxy tx = r.cache.getTXMgr().internalSuspend();
+ TXStateProxy tx = r.cache.getTXMgr().pauseTransaction();
try {
return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead,
preferCD, requestingClient, clientEvent, returnTombstones);
} finally {
- r.cache.getTXMgr().internalResume(tx);
+ r.cache.getTXMgr().unpauseTransaction(tx);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
new file mode 100644
index 0000000..de9e91e
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Region.Entry;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.UnsupportedOperationInTransactionException;
+import org.apache.geode.cache.client.internal.ServerRegionDataAccess;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
+import org.apache.geode.internal.cache.tx.TransactionalOperation.ServerRegionOperation;
+
+public class PausedTXStateProxyImpl implements TXStateProxy {
+
+ @Override
+ public TransactionId getTransactionId() {
+ return null;
+ }
+
+ @Override
+ public TXRegionState readRegion(LocalRegion r) {
+ return null;
+ }
+
+ @Override
+ public TXRegionState writeRegion(LocalRegion r) {
+ return null;
+ }
+
+ @Override
+ public long getBeginTime() {
+ return 0;
+ }
+
+ @Override
+ public int getChanges() {
+ return 0;
+ }
+
+ @Override
+ public boolean isInProgress() {
+ return false;
+ }
+
+ @Override
+ public int nextModSerialNum() {
+ return 0;
+ }
+
+ @Override
+ public boolean needsLargeModCount() {
+ return false;
+ }
+
+ @Override
+ public void precommit()
+ throws CommitConflictException, UnsupportedOperationInTransactionException {}
+
+ @Override
+ public void commit() throws CommitConflictException {}
+
+ @Override
+ public void rollback() {}
+
+ @Override
+ public List getEvents() {
+ return null;
+ }
+
+ @Override
+ public Cache getCache() {
+ return null;
+ }
+
+ @Override
+ public Collection<LocalRegion> getRegions() {
+ return null;
+ }
+
+ @Override
+ public void invalidateExistingEntry(EntryEventImpl event, boolean invokeCallbacks,
+ boolean forceNewEntry) {}
+
+ @Override
+ public Entry getEntry(KeyInfo keyInfo, LocalRegion region, boolean allowTombstones) {
+ return null;
+ }
+
+ @Override
+ public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
+ boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
+ boolean returnTombstones, boolean retainResult) {
+ return null;
+ }
+
+ @Override
+ public TXEvent getEvent() {
+ return null;
+ }
+
+ @Override
+ public TXRegionState txWriteRegion(LocalRegion localRegion, KeyInfo entryKey) {
+ return null;
+ }
+
+ @Override
+ public TXRegionState txReadRegion(LocalRegion localRegion) {
+ return null;
+ }
+
+ @Override
+ public boolean txPutEntry(EntryEventImpl event, boolean ifNew, boolean requireOldValue,
+ boolean checkResources, Object expectedOldValue) {
+ return false;
+ }
+
+ @Override
+ public TXEntryState txReadEntry(KeyInfo entryKey, LocalRegion localRegion, boolean rememberRead,
+ boolean createTxEntryIfAbsent) {
+ return null;
+ }
+
+ @Override
+ public void rmRegion(LocalRegion r) {
+
+ }
+
+ @Override
+ public boolean isInProgressAndSameAs(TXStateInterface state) {
+ return false;
+ }
+
+ @Override
+ public boolean isFireCallbacks() {
+ return false;
+ }
+
+ @Override
+ public ReentrantLock getLock() {
+ return null;
+ }
+
+ @Override
+ public boolean isRealDealLocal() {
+ return false;
+ }
+
+ @Override
+ public boolean isMemberIdForwardingRequired() {
+ return false;
+ }
+
+ @Override
+ public InternalDistributedMember getOriginatingMember() {
+ return null;
+ }
+
+ @Override
+ public TXCommitMessage getCommitMessage() {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isTxState() {
+ return false;
+ }
+
+ @Override
+ public boolean isTxStateStub() {
+ return false;
+ }
+
+ @Override
+ public boolean isTxStateProxy() {
+ return false;
+ }
+
+ @Override
+ public boolean isDistTx() {
+ return false;
+ }
+
+ @Override
+ public boolean isCreatedOnDistTxCoordinator() {
+ return false;
+ }
+
+ @Override
+ public void beforeCompletion() {}
+
+ @Override
+ public void afterCompletion(int status) {}
+
+ @Override
+ public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
+ Object expectedOldValue) throws EntryNotFoundException {}
+
+ @Override
+ public int entryCount(LocalRegion localRegion) {
+ return 0;
+ }
+
+ @Override
+ public Object getValueInVM(KeyInfo keyInfo, LocalRegion localRegion, boolean rememberRead) {
+ return null;
+ }
+
+ @Override
+ public boolean containsKey(KeyInfo keyInfo, LocalRegion localRegion) {
+ return false;
+ }
+
+ @Override
+ public boolean containsValueForKey(KeyInfo keyInfo, LocalRegion localRegion) {
+ return false;
+ }
+
+ @Override
+ public Entry getEntryOnRemote(KeyInfo key, LocalRegion localRegion, boolean allowTombstones)
+ throws DataLocationException {
+ return null;
+ }
+
+ @Override
+ public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
+ Object expectedOldValue, boolean requireOldValue, long lastModified,
+ boolean overwriteDestroyed) {
+ return false;
+ }
+
+ @Override
+ public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew, boolean ifOld,
+ Object expectedOldValue, boolean requireOldValue, long lastModified,
+ boolean overwriteDestroyed) throws DataLocationException {
+ return false;
+ }
+
+ @Override
+ public void destroyOnRemote(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue)
+ throws DataLocationException {}
+
+ @Override
+ public void invalidateOnRemote(EntryEventImpl event, boolean invokeCallbacks,
+ boolean forceNewEntry) throws DataLocationException {}
+
+ @Override
+ public boolean isDeferredStats() {
+ return false;
+ }
+
+ @Override
+ public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, boolean generateCallbacks,
+ Object value, boolean disableCopyOnRead, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
+ boolean returnTombstones) {
+ return null;
+ }
+
+ @Override
+ public Object getEntryForIterator(KeyInfo key, LocalRegion currRgn, boolean rememberReads,
+ boolean allowTombstones) {
+ return null;
+ }
+
+ @Override
+ public Object getKeyForIterator(KeyInfo keyInfo, LocalRegion currRgn, boolean rememberReads,
+ boolean allowTombstones) {
+ return null;
+ }
+
+ @Override
+ public Set getAdditionalKeysForIterator(LocalRegion currRgn) {
+ return null;
+ }
+
+ @Override
+ public Collection<?> getRegionKeysForIteration(LocalRegion currRegion) {
+ return null;
+ }
+
+ @Override
+ public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry,
+ ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
+ boolean returnTombstones) throws DataLocationException {
+ return null;
+ }
+
+ @Override
+ public void checkSupportsRegionDestroy() throws UnsupportedOperationInTransactionException {}
+
+ @Override
+ public void checkSupportsRegionInvalidate() throws UnsupportedOperationInTransactionException {}
+
+ @Override
+ public void checkSupportsRegionClear() throws UnsupportedOperationInTransactionException {}
+
+ @Override
+ public Set getBucketKeys(LocalRegion localRegion, int bucketId, boolean allowTombstones) {
+ return null;
+ }
+
+ @Override
+ public void postPutAll(DistributedPutAllOperation putallOp, VersionedObjectList successfulPuts,
+ LocalRegion region) {}
+
+ @Override
+ public void postRemoveAll(DistributedRemoveAllOperation op, VersionedObjectList successfulOps,
+ LocalRegion region) {}
+
+ @Override
+ public Entry accessEntry(KeyInfo keyInfo, LocalRegion localRegion) {
+ return null;
+ }
+
+ @Override
+ public void updateEntryVersion(EntryEventImpl event) throws EntryNotFoundException {}
+
+ @Override
+ public void checkJTA(String errmsg) throws IllegalStateException {}
+
+ @Override
+ public void setIsJTA(boolean isJTA) {}
+
+ @Override
+ public TXId getTxId() {
+ return null;
+ }
+
+ @Override
+ public TXManagerImpl getTxMgr() {
+ return null;
+ }
+
+ @Override
+ public void setLocalTXState(TXStateInterface state) {}
+
+ @Override
+ public void setTarget(DistributedMember target) {}
+
+ @Override
+ public DistributedMember getTarget() {
+ return null;
+ }
+
+ @Override
+ public boolean isCommitOnBehalfOfRemoteStub() {
+ return false;
+ }
+
+ @Override
+ public boolean setCommitOnBehalfOfRemoteStub(boolean requestedByOwner) {
+ return false;
+ }
+
+ @Override
+ public boolean isOnBehalfOfClient() {
+ return false;
+ }
+
+ @Override
+ public boolean isJCATransaction() {
+ return false;
+ }
+
+ @Override
+ public void setJCATransaction() {}
+
+ @Override
+ public void setSynchronizationRunnable(TXSynchronizationRunnable sync) {}
+
+ @Override
+ public TXSynchronizationRunnable getSynchronizationRunnable() {
+ return null;
+ }
+
+ @Override
+ public void suspend() {}
+
+ @Override
+ public void resume() {}
+
+ @Override
+ public void recordTXOperation(ServerRegionDataAccess proxy, ServerRegionOperation op, Object key,
+ Object[] arguments) {}
+
+ @Override
+ public int operationCount() {
+ return 0;
+ }
+
+ @Override
+ public void setInProgress(boolean progress) {}
+
+ @Override
+ public void updateProxyServer(InternalDistributedMember proxy) {}
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index 6afed9e..ebd37cc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -37,8 +37,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
-import org.apache.logging.log4j.Logger;
-
import org.apache.geode.DataSerializer;
import org.apache.geode.GemFireException;
import org.apache.geode.InternalGemFireError;
@@ -68,6 +66,7 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.MapCallback;
+import org.apache.logging.log4j.Logger;
/**
* The internal implementation of the {@link CacheTransactionManager} interface returned by
@@ -317,6 +316,13 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
.toLocalizedString(tid));
}
}
+ {
+ TXStateProxy curProxy = txContext.get();
+ if (curProxy == PAUSED) {
+ throw new java.lang.IllegalStateException(
+ "Current thread has paused its transaction so it can not start a new transaction");
+ }
+ }
TXId id = new TXId(this.distributionMgrId, this.uniqId.incrementAndGet());
TXStateProxyImpl proxy = null;
if (isDistributed()) {
@@ -591,6 +597,10 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
*/
public TXStateProxy getTXState() {
TXStateProxy tsp = txContext.get();
+ if (tsp == PAUSED) {
+ // treats paused transaction as no transaction.
+ return null;
+ }
if (tsp != null && !tsp.isInProgress()) {
this.txContext.set(null);
tsp = null;
@@ -609,6 +619,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
public boolean setInProgress(boolean progress) {
boolean retVal = false;
TXStateProxy tsp = txContext.get();
+ assert tsp != PAUSED;
if (tsp != null) {
retVal = tsp.isInProgress();
tsp.setInProgress(progress);
@@ -670,29 +681,92 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
}
}
+ private static final TXStateProxy PAUSED = new PausedTXStateProxyImpl();
+
+ /**
+ * If the current thread is in a transaction then pause will cause it to no longer be in a
+ * transaction. The same thread is expected to unpause/resume the transaction later.
+ *
+ * @return the state of the transaction or null. Pass this value to
+ * {@link TXManagerImpl#unpauseTransaction} to reactivate the puased/suspended
+ * transaction.
+ */
+ public TXStateProxy pauseTransaction() {
+ return internalSuspend(true);
+ }
+
/**
* If the current thread is in a transaction then suspend will cause it to no longer be in a
- * transaction.
+ * transaction. Currently only used in testing.
*
- * @return the state of the transaction or null. Pass this value to {@link TXManagerImpl#resume}
- * to reactivate the suspended transaction.
+ * @return the state of the transaction or null. to reactivate the suspended transaction.
+ * @deprecated use {@link TXManagerImpl#pauseTransaction} or
+ * {@link CacheTransactionManager#suspend} instead
*/
+ @Deprecated
public TXStateProxy internalSuspend() {
+ return internalSuspend(false);
+ }
+
+ /**
+ * If the current thread is in a transaction then suspend will cause it to no longer be in a
+ * transaction.
+ *
+ * @param needToResumeBySameThread whether a suspended transaction needs to be resumed by the same
+ * thread.
+ * @return the state of the transaction or null. Pass this value to
+ * {@link TXManagerImpl#internalResume(TXStateProxy, boolean)} to reactivate the suspended
+ * transaction.
+ */
+ private TXStateProxy internalSuspend(boolean needToResumeBySameThread) {
TXStateProxy result = getTXState();
if (result != null) {
result.suspend();
- setTXState(null);
+ if (needToResumeBySameThread) {
+ setTXState(PAUSED);
+ } else {
+ setTXState(null);
+ }
}
return result;
}
/**
- * Activates the specified transaction on the calling thread.
- *
+ * Activates the specified transaction on the calling thread. Only the same thread that pause the
+ * transaction can unpause it.
+ *
+ * @param tx the transaction to be unpaused.
+ * @throws IllegalStateException if this thread already has an active transaction or this thread
+ * did not pause the transaction.
+ */
+ public void unpauseTransaction(TXStateProxy tx) {
+ internalResume(tx, true);
+ }
+
+ /**
+ * Activates the specified transaction on the calling thread. Does not require the same thread to
+ * resume it. Currently only used in testing.
+ *
* @param tx the transaction to activate.
* @throws IllegalStateException if this thread already has an active transaction
+ *
+ * @deprecated use {@link TXManagerImpl#unpauseTransaction} or
+ * {@link CacheTransactionManager#resume} instead
*/
+ @Deprecated
public void internalResume(TXStateProxy tx) {
+ internalResume(tx, false);
+ }
+
+ /**
+ * Activates the specified transaction on the calling thread.
+ *
+ * @param tx the transaction to activate.
+ * @param needToResumeBySameThread whether a suspended transaction needs to be resumed by the same
+ * thread.
+ * @throws IllegalStateException if this thread already has an active transaction
+ */
+ private void internalResume(TXStateProxy tx, boolean needToResumeBySameThread) {
if (tx != null) {
TransactionId tid = getTransactionId();
if (tid != null) {
@@ -700,11 +774,23 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
LocalizedStrings.TXManagerImpl_TRANSACTION_0_ALREADY_IN_PROGRESS
.toLocalizedString(tid));
}
+ if (needToResumeBySameThread) {
+ TXStateProxy result = txContext.get();
+ if (result != PAUSED) {
+ throw new java.lang.IllegalStateException(
+ "try to unpause a transaction not paused by the same thread");
+ }
+ }
setTXState(tx);
+
tx.resume();
}
}
+ public boolean isTransactionPaused() {
+ return txContext.get() == PAUSED;
+ }
+
/**
* @deprecated use internalResume instead
*/
@@ -750,7 +836,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
public int getMyTXUniqueId() {
TXStateProxy t = txContext.get();
- if (t != null) {
+ if (t != null && t != PAUSED) {
return t.getTxId().getUniqId();
} else {
return NOTX;
@@ -1229,7 +1315,8 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
TXStateProxy result = getTXState();
if (result != null) {
TransactionId txId = result.getTransactionId();
- internalSuspend();
+ result.suspend();
+ setTXState(null);
this.suspendedTXs.put(txId, result);
// wake up waiting threads
Queue<Thread> waitingThreads = this.waitMap.get(txId);
@@ -1286,7 +1373,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
private void resumeProxy(TXStateProxy txProxy) {
assert txProxy != null;
- internalResume(txProxy);
+ assert getTXState() == null;
+ setTXState(txProxy);
+ txProxy.resume();
SystemTimerTask task = this.expiryTasks.remove(txProxy.getTransactionId());
if (task != null) {
if (task.cancel()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 662f7b0..2d109d8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -1666,11 +1666,11 @@ public class TXState implements TXStateInterface {
if (!pr.getBucketPrimary(curr.getBucketId()).equals(pr.cache.getMyId())) {
// to fix bug 47893 suspend the tx before calling nonTXGetEntry
final TXManagerImpl txmgr = pr.getGemFireCache().getTXMgr();
- final TXStateProxy tx = txmgr.internalSuspend();
+ final TXStateProxy tx = txmgr.pauseTransaction();
try {
return pr.nonTXGetEntry(curr, false, allowTombstones);
} finally {
- txmgr.internalResume(tx);
+ txmgr.unpauseTransaction(tx);
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
index d392c41..c7ba38c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
@@ -64,14 +64,12 @@ public interface TXStateProxy extends TXStateInterface {
public TXSynchronizationRunnable getSynchronizationRunnable();
/**
- * Called by {@link TXManagerImpl#internalSuspend()} to perform additional tasks required to
- * suspend a transaction
+ * Perform additional tasks required by the proxy to suspend a transaction
*/
public void suspend();
/**
- * Called by {@link TXManagerImpl#internalResume(TXStateProxy)} to perform additional tasks
- * required to resume a transaction
+ * Perform additional tasks required by the proxy to resume a transaction
*/
public void resume();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index 6b6b712..7a1e202 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -36,12 +36,14 @@ import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.cache.client.internal.ServerRegionDataAccess;
import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tx.ClientTXStateStub;
import org.apache.geode.internal.cache.tx.TransactionalOperation.ServerRegionOperation;
import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.lang.SystemPropertyHelper;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -132,6 +134,7 @@ public class TXStateProxyImpl implements TXStateProxy {
// wait for the region to be initialized fixes bug 44652
r.waitOnInitialization(r.initializationLatchBeforeGetInitialImage);
target = r.getOwnerForKey(key);
+
if (target == null || target.equals(this.txMgr.getDM().getId())) {
this.realDeal = new TXState(this, false);
} else {
@@ -463,7 +466,7 @@ public class TXStateProxyImpl implements TXStateProxy {
TXStateProxy txp = null;
boolean txUnlocked = false;
if (resetTXState) {
- txp = getTxMgr().internalSuspend();
+ txp = getTxMgr().pauseTransaction();
} else {
if (getLock().isHeldByCurrentThread()) {
txUnlocked = true; // bug #42945 - hang trying to compute size for PR
@@ -477,7 +480,7 @@ public class TXStateProxyImpl implements TXStateProxy {
return getRealDeal(null, localRegion).entryCount(localRegion);
} finally {
if (resetTXState) {
- getTxMgr().internalResume(txp);
+ getTxMgr().unpauseTransaction(txp);
} else if (txUnlocked) {
getLock().lock();
}
@@ -506,12 +509,15 @@ public class TXStateProxyImpl implements TXStateProxy {
return getRealDeal(null, currRgn).getAdditionalKeysForIterator(currRgn);
}
+ protected final boolean restoreSetOperationTransactionBehavior =
+ SystemPropertyHelper.restoreSetOperationTransactionBehavior();
+
public Object getEntryForIterator(KeyInfo key, LocalRegion currRgn, boolean rememberReads,
boolean allowTombstones) {
- boolean resetTxState = this.realDeal == null;
+ boolean resetTxState = isTransactionInternalSuspendNeeded(currRgn);
TXStateProxy txp = null;
if (resetTxState) {
- txp = getTxMgr().internalSuspend();
+ txp = getTxMgr().pauseTransaction();
}
try {
if (resetTxState) {
@@ -521,17 +527,23 @@ public class TXStateProxyImpl implements TXStateProxy {
allowTombstones);
} finally {
if (resetTxState) {
- getTxMgr().internalResume(txp);
+ getTxMgr().unpauseTransaction(txp);
}
}
}
+ private boolean isTransactionInternalSuspendNeeded(LocalRegion region) {
+ boolean resetTxState = this.realDeal == null
+ && (!region.canStoreDataLocally() || restoreSetOperationTransactionBehavior);
+ return resetTxState;
+ }
+
public Object getKeyForIterator(KeyInfo keyInfo, LocalRegion currRgn, boolean rememberReads,
boolean allowTombstones) {
- boolean resetTxState = this.realDeal == null;
+ boolean resetTxState = isTransactionInternalSuspendNeeded(currRgn);
TXStateProxy txp = null;
if (resetTxState) {
- txp = getTxMgr().internalSuspend();
+ txp = getTxMgr().pauseTransaction();
}
try {
if (resetTxState) {
@@ -542,7 +554,7 @@ public class TXStateProxyImpl implements TXStateProxy {
allowTombstones);
} finally {
if (resetTxState) {
- getTxMgr().internalResume(txp);
+ getTxMgr().unpauseTransaction(txp);
}
}
}
@@ -635,11 +647,10 @@ public class TXStateProxyImpl implements TXStateProxy {
}
public Set getBucketKeys(LocalRegion localRegion, int bucketId, boolean allowTombstones) {
- // if this the first operation in a transaction, reset txState
- boolean resetTxState = this.realDeal == null;
+ boolean resetTxState = isTransactionInternalSuspendNeeded(localRegion);
TXStateProxy txp = null;
if (resetTxState) {
- txp = getTxMgr().internalSuspend();
+ txp = getTxMgr().pauseTransaction();
}
try {
if (resetTxState) {
@@ -648,7 +659,7 @@ public class TXStateProxyImpl implements TXStateProxy {
return getRealDeal(null, localRegion).getBucketKeys(localRegion, bucketId, false);
} finally {
if (resetTxState) {
- getTxMgr().internalResume(txp);
+ getTxMgr().unpauseTransaction(txp);
}
}
}
@@ -678,7 +689,21 @@ public class TXStateProxyImpl implements TXStateProxy {
if (currRegion.isUsedForPartitionedRegionBucket()) {
return currRegion.getRegionKeysForIteration();
} else {
- return getRealDeal(null, currRegion).getRegionKeysForIteration(currRegion);
+ boolean resetTxState = isTransactionInternalSuspendNeeded(currRegion);
+ TXStateProxy txp = null;
+ if (resetTxState) {
+ txp = getTxMgr().pauseTransaction();
+ }
+ try {
+ if (resetTxState) {
+ return currRegion.getSharedDataView().getRegionKeysForIteration(currRegion);
+ }
+ return getRealDeal(null, currRegion).getRegionKeysForIteration(currRegion);
+ } finally {
+ if (resetTxState) {
+ getTxMgr().unpauseTransaction(txp);
+ }
+ }
}
}
@@ -709,6 +734,10 @@ public class TXStateProxyImpl implements TXStateProxy {
return null;
}
+ public boolean hasRealDeal() {
+ return this.realDeal != null;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
index 6055705..76dbca3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
@@ -39,6 +39,7 @@ import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tx.TXRegionStub;
import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.LogService;
/**
* TXStateStub lives on the accessor node when we are remoting a transaction. It is a stub for
@@ -485,7 +486,11 @@ public abstract class TXStateStub implements TXStateInterface {
*/
public Object getKeyForIterator(KeyInfo keyInfo, LocalRegion currRgn, boolean rememberReads,
boolean allowTombstones) {
- return keyInfo.getKey();
+ Object key = keyInfo.getKey();
+ if (key instanceof RegionEntry) {
+ return ((RegionEntry) key).getKey();
+ }
+ return key;
}
/*
diff --git a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
new file mode 100644
index 0000000..97d6343
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.lang;
+
+/**
+ * The SystemPropertyHelper class is an helper class for accessing system properties used in geode.
+ * The method name to get the system property should be the same as the system property name.
+ *
+ * @since Geode 1.4.0
+ */
+
+public class SystemPropertyHelper {
+ private static final String GEODE_PREFIX = "geode.";
+ private static final String GEMFIRE_PREFIX = "gemfire.";
+
+ /**
+ * This method will try to look up "geode." and "gemfire." versions of the system property. It
+ * will check and prefer "geode." setting first, then try to check "gemfire." setting.
+ *
+ * @param name system property name set in Geode
+ * @return a boolean value of the system property
+ */
+ private static boolean getProductBooleanProperty(String name) {
+ String property = System.getProperty(GEODE_PREFIX + name);
+ if (property != null) {
+ return Boolean.getBoolean(GEODE_PREFIX + name);
+ }
+ return Boolean.getBoolean(GEMFIRE_PREFIX + name);
+ }
+
+ /**
+ * As of Geode 1.4.0, a region set operation will be in a transaction even if it is the first
+ * operation in the transaction.
+ *
+ * In previous releases, a region operation is not in a transaction if it is the first operation
+ * of the transaction.
+ *
+ * Setting this system property to true will restore the previous behavior.
+ *
+ * @since Geode 1.4.0
+ */
+ public static boolean restoreSetOperationTransactionBehavior() {
+ return getProductBooleanProperty("restoreSetOperationTransactionBehavior");
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
index ea9bf3e..ad56b78 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
@@ -618,13 +618,13 @@ public class PeerTypeRegistration implements TypeRegistration {
private TXStateProxy suspendTX() {
InternalCache cache = (InternalCache) getIdToType().getRegionService();
TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
- return txManager.internalSuspend();
+ return txManager.pauseTransaction();
}
private void resumeTX(TXStateProxy state) {
if (state != null) {
TXManagerImpl txManager = state.getTxMgr();
- txManager.internalResume(state);
+ txManager.unpauseTransaction(state);
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/SetOperationTXJUnitTest.java b/geode-core/src/test/java/org/apache/geode/SetOperationTXJUnitTest.java
new file mode 100644
index 0000000..78cf324
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/SetOperationTXJUnitTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.junit.Assert.*;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+
+@Category(IntegrationTest.class)
+@RunWith(JUnitParamsRunner.class)
+public class SetOperationTXJUnitTest {
+
+ private static final Logger logger = LogService.getLogger();
+ private static final String REGION_NAME = "region1";
+
+ private Map<Long, String> testData;
+ private Cache cache;
+
+ @Rule
+ public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+ @Before
+ public void setup() {
+ testData = new HashMap<>();
+ testData.put(1L, "value1");
+ testData.put(2L, "value2");
+ testData.put(3L, "duplicateValue");
+ testData.put(4L, "duplicateValue");
+ }
+
+ @After
+ public void tearDownTest() throws Exception {
+ closeCache();
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionKeysetWithTx(boolean disableSetOpToStartTx) {
+ Region<Long, String> region = setupAndLoadRegion(disableSetOpToStartTx);
+ CacheTransactionManager txMgr = cache.getCacheTransactionManager();
+ try {
+ txMgr.begin();
+ Collection<Long> set = region.keySet();
+ set.forEach((key) -> assertTrue(testData.keySet().contains(key)));
+ } finally {
+ validateTXManager(disableSetOpToStartTx);
+ txMgr.rollback();
+ }
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionValuesWithTx(boolean disableSetOpToStartTx) {
+ Region<Long, String> region = setupAndLoadRegion(disableSetOpToStartTx);
+ CacheTransactionManager txMgr = cache.getCacheTransactionManager();
+ try {
+ txMgr.begin();
+ Collection<String> set = region.values();
+ set.forEach((value) -> assertTrue(testData.values().contains(value)));
+ } finally {
+ validateTXManager(disableSetOpToStartTx);
+ txMgr.rollback();
+ }
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionEntriesWithTx(boolean disableSetOpToStartTx) {
+ Region<Long, String> region = setupAndLoadRegion(disableSetOpToStartTx);
+ CacheTransactionManager txMgr = cache.getCacheTransactionManager();
+ try {
+ txMgr.begin();
+ Collection<Map.Entry<Long, String>> set = region.entrySet();
+ set.forEach((entry) -> {
+ assertTrue(testData.values().contains(entry.getValue()));
+ assertTrue(testData.keySet().contains(entry.getKey()));
+ });
+ } finally {
+ validateTXManager(disableSetOpToStartTx);
+ txMgr.rollback();
+ }
+ }
+
+ private Region<Long, String> setupAndLoadRegion(boolean disableSetOpToStartTx) {
+ this.cache = createCache(disableSetOpToStartTx);
+ Region<Long, String> region = createRegion(cache);
+ testData.forEach((k, v) -> region.put(k, v));
+ return region;
+ }
+
+ private void validateTXManager(boolean disableSetOpToStartTx) {
+ assertNotNull(TXManagerImpl.getCurrentTXState());
+ if (disableSetOpToStartTx) {
+ assertFalse(((TXStateProxyImpl) TXManagerImpl.getCurrentTXState()).hasRealDeal());
+ } else {
+ assertTrue(((TXStateProxyImpl) TXManagerImpl.getCurrentTXState()).hasRealDeal());
+ }
+ }
+
+ protected Region<Long, String> createRegion(Cache cache) {
+ RegionFactory<Long, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+ Region<Long, String> r = rf.create(REGION_NAME);
+ return r;
+ }
+
+ final String restoreSetOperationTransactionBehavior = "restoreSetOperationTransactionBehavior";
+ final String RESTORE_SET_OPERATION_PROPERTY =
+ (System.currentTimeMillis() % 2 == 0 ? DistributionConfig.GEMFIRE_PREFIX : "geode.")
+ + restoreSetOperationTransactionBehavior;
+
+ private Cache createCache(boolean disableSetOpToStartTx) {
+ if (disableSetOpToStartTx) {
+ logger.info("setting system property {} to true ", RESTORE_SET_OPERATION_PROPERTY);
+ System.setProperty(RESTORE_SET_OPERATION_PROPERTY, "true");
+ }
+ CacheFactory cf = new CacheFactory().set(MCAST_PORT, "0");
+ this.cache = (GemFireCacheImpl) cf.create();
+ return this.cache;
+ }
+
+ protected void closeCache() {
+ if (this.cache != null) {
+ Cache c = this.cache;
+ this.cache = null;
+ c.close();
+ }
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java b/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java
index 077d3e6..f5903ed 100644
--- a/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java
@@ -198,9 +198,9 @@ public class TXExpiryJUnitTest {
} else {
checkVal = "conflictVal";
final TXManagerImpl txMgrImpl = (TXManagerImpl) this.txMgr;
- TXStateProxy tx = txMgrImpl.internalSuspend();
+ TXStateProxy tx = txMgrImpl.pauseTransaction();
exprReg.put("key0", checkVal);
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
try {
this.txMgr.commit();
fail("Expected CommitConflictException!");
diff --git a/geode-core/src/test/java/org/apache/geode/TXJUnitTest.java b/geode-core/src/test/java/org/apache/geode/TXJUnitTest.java
index cf1281d..8f36a9a 100644
--- a/geode-core/src/test/java/org/apache/geode/TXJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/TXJUnitTest.java
@@ -15,6 +15,7 @@
package org.apache.geode;
import static org.apache.geode.distributed.ConfigurationProperties.*;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.*;
import java.util.ArrayList;
@@ -92,6 +93,7 @@ import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.util.StopWatch;
import org.apache.geode.test.junit.categories.IntegrationTest;
@@ -4332,11 +4334,11 @@ public class TXJUnitTest {
this.txMgr.begin();
reg1.create("key1", "txValue");
assertEquals("txValue", reg1.getEntry("key1").getValue());
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
assertTrue(!reg1.containsKey("key1"));
assertEquals("LV 4", reg1.get("key1"));
assertTrue(reg1.containsKey("key1"));
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals("txValue", reg1.getEntry("key1").getValue());
assertEquals("txValue", reg1.get("key1"));
try {
@@ -4377,12 +4379,12 @@ public class TXJUnitTest {
this.txMgr.begin();
assertEquals("LV 8", reg1.get("key1"));
assertEquals("LV 8", reg1.getEntry("key1").getValue());
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
assertTrue(!reg1.containsKey("key1"));
reg1.create("key1", "txValue");
assertTrue(reg1.containsKey("key1"));
assertEquals("txValue", reg1.get("key1"));
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals("LV 8", reg1.getEntry("key1").getValue());
try {
this.txMgr.commit(); // should conflict
@@ -4397,10 +4399,10 @@ public class TXJUnitTest {
// TX load conflict: no-inital state, tx load->update, committed update
{
final TXManagerImpl txMgrImpl = (TXManagerImpl) this.txMgr;
- TXStateProxy tx;
+ TransactionId txId = null;
this.txMgr.begin();
reg1.create("key1", "txValue");
- tx = txMgrImpl.internalSuspend();
+ txId = txMgrImpl.suspend();
assertTrue(!reg1.containsKey("key1"));
// new transaction, load(create) + put
this.txMgr.begin();
@@ -4413,7 +4415,7 @@ public class TXJUnitTest {
assertTrue(reg1.containsKey("key1"));
assertEquals("txValue2", reg1.get("key1"));
assertEquals("txValue2", reg1.getEntry("key1").getValue());
- txMgrImpl.internalResume(tx);
+ txMgrImpl.resume(txId);
assertEquals("txValue", reg1.getEntry("key1").getValue());
assertEquals("txValue", reg1.get("key1"));
try {
@@ -4755,10 +4757,10 @@ public class TXJUnitTest {
} catch (EntryExistsException ok) {
}
// begin other tx simulation
- TXStateProxy tx = txMgrImpl.internalSuspend();
+ TXStateProxy tx = txMgrImpl.pauseTransaction();
this.region.put("stats1", "stats success1");
this.region.put("stats2", "stats success2");
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
// end other tx simulation
pause(SLEEP_MS);
try {
@@ -4905,12 +4907,36 @@ public class TXJUnitTest {
}
@Test
+ public void testPauseUnpause() {
+ TXManagerImpl txMgrImpl = (TXManagerImpl) this.txMgr;
+ assertTrue(!this.txMgr.exists());
+ assertEquals(null, txMgrImpl.pauseTransaction());
+ TXStateProxy txProxy = null;
+ txMgrImpl.unpauseTransaction(txProxy);
+ assertTrue(!this.txMgr.exists());
+
+ this.txMgr.begin();
+ TransactionId origId = this.txMgr.getTransactionId();
+ assertTrue(this.txMgr.exists());
+ {
+ TXStateProxy tx = txMgrImpl.pauseTransaction();
+ assertTrue(!this.txMgr.exists());
+ assertThatThrownBy(() -> this.txMgr.begin()).isInstanceOf(IllegalStateException.class);
+ assertTrue(!this.txMgr.exists());
+ txMgrImpl.unpauseTransaction(tx);
+ }
+ assertTrue(this.txMgr.exists());
+ assertEquals(origId, this.txMgr.getTransactionId());
+ this.txMgr.rollback();
+ }
+
+ @Test
public void testSuspendResume() {
TXManagerImpl txMgrImpl = (TXManagerImpl) this.txMgr;
assertTrue(!this.txMgr.exists());
- assertEquals(null, txMgrImpl.internalSuspend());
+ assertEquals(null, txMgrImpl.pauseTransaction());
TXStateProxy txProxy = null;
- txMgrImpl.internalResume(txProxy);
+ txMgrImpl.unpauseTransaction(txProxy);
assertTrue(!this.txMgr.exists());
this.txMgr.begin();
@@ -4924,6 +4950,7 @@ public class TXJUnitTest {
txMgrImpl.internalResume(tx);
fail("expected IllegalStateException");
} catch (IllegalStateException expected) {
+ LogService.getLogger().info("expected ", expected);
}
this.txMgr.rollback();
assertTrue(!this.txMgr.exists());
@@ -5040,9 +5067,9 @@ public class TXJUnitTest {
this.region.put("key1", "value1"); // non-tx
txMgrImpl.begin();
assertEquals("value1", this.region.get("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals("value1", this.region.get("key1"));
txMgrImpl.commit();
@@ -5051,9 +5078,9 @@ public class TXJUnitTest {
this.region.put("key1", "value1"); // non-tx
txMgrImpl.begin();
assertEquals("value1", this.region.get("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals("value1", this.region.get("key1"));
this.region.put("key1", "value3");
assertEquals("value3", this.region.get("key1"));
@@ -5067,9 +5094,9 @@ public class TXJUnitTest {
this.region.put("key1", "value1"); // non-tx
txMgrImpl.begin();
this.region.getEntry("key1");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals("value1", this.region.get("key1"));
txMgrImpl.commit();
@@ -5078,9 +5105,9 @@ public class TXJUnitTest {
this.region.put("key1", "value1"); // non-tx
txMgrImpl.begin();
this.region.getEntry("key1");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
this.region.put("key1", "value3");
try {
txMgrImpl.commit();
@@ -5093,9 +5120,9 @@ public class TXJUnitTest {
txMgrImpl.begin();
this.region.get("key1"); // bootstrap the tx, entrySet does not
this.region.entrySet(false).iterator().next();
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals("value1", this.region.get("key1"));
txMgrImpl.commit();
@@ -5105,9 +5132,9 @@ public class TXJUnitTest {
txMgrImpl.begin();
this.region.get("key1"); // bootstrap the tx, entrySet does not
this.region.entrySet(false).iterator().next();
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals("value1", this.region.get("key1"));
this.region.put("key1", "value3");
try {
@@ -5120,17 +5147,17 @@ public class TXJUnitTest {
this.region.put("key1", "value1"); // non-tx
txMgrImpl.begin();
assertEquals(true, this.region.containsKey("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.remove("key1"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(true, this.region.containsKey("key1"));
txMgrImpl.commit();
this.region.put("key1", "value1"); // non-tx
txMgrImpl.begin();
assertEquals(true, this.region.containsKey("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.remove("key1"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(true, this.region.containsKey("key1"));
this.region.put("key1", "value3");
assertEquals(true, this.region.containsKey("key1"));
@@ -5143,17 +5170,17 @@ public class TXJUnitTest {
this.region.put("key1", "value1"); // non-tx
txMgrImpl.begin();
assertEquals(true, this.region.containsValueForKey("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.remove("key1"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(true, this.region.containsValueForKey("key1"));
txMgrImpl.commit();
this.region.put("key1", "value1"); // non-tx
txMgrImpl.begin();
assertEquals(true, this.region.containsValueForKey("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.remove("key1"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(true, this.region.containsValueForKey("key1"));
this.region.put("key1", "value3");
assertEquals(true, this.region.containsValueForKey("key1"));
@@ -5169,9 +5196,9 @@ public class TXJUnitTest {
this.region.remove("key1"); // non-tx
txMgrImpl.begin();
assertEquals(null, this.region.get("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(null, this.region.get("key1"));
txMgrImpl.commit();
@@ -5180,9 +5207,9 @@ public class TXJUnitTest {
this.region.remove("key1"); // non-tx
txMgrImpl.begin();
assertEquals(null, this.region.get("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(null, this.region.get("key1"));
this.region.put("key1", "value3");
assertEquals("value3", this.region.get("key1"));
@@ -5196,9 +5223,9 @@ public class TXJUnitTest {
this.region.remove("key1"); // non-tx
txMgrImpl.begin();
assertEquals(null, this.region.getEntry("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(null, this.region.getEntry("key1"));
txMgrImpl.commit();
@@ -5207,9 +5234,9 @@ public class TXJUnitTest {
this.region.remove("key1"); // non-tx
txMgrImpl.begin();
assertEquals(null, this.region.getEntry("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(null, this.region.getEntry("key1"));
this.region.put("key1", "value3");
try {
@@ -5222,17 +5249,17 @@ public class TXJUnitTest {
this.region.remove("key1"); // non-tx
txMgrImpl.begin();
assertEquals(false, this.region.containsKey("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(false, this.region.containsKey("key1"));
txMgrImpl.commit();
this.region.remove("key1"); // non-tx
txMgrImpl.begin();
assertEquals(false, this.region.containsKey("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(false, this.region.containsKey("key1"));
this.region.put("key1", "value3");
assertEquals(true, this.region.containsKey("key1"));
@@ -5246,17 +5273,17 @@ public class TXJUnitTest {
this.region.remove("key1"); // non-tx
txMgrImpl.begin();
assertEquals(false, this.region.containsValueForKey("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(false, this.region.containsValueForKey("key1"));
txMgrImpl.commit();
this.region.remove("key1"); // non-tx
txMgrImpl.begin();
assertEquals(false, this.region.containsValueForKey("key1"));
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(false, this.region.containsValueForKey("key1"));
this.region.put("key1", "value3");
assertEquals(true, this.region.containsValueForKey("key1"));
@@ -5272,9 +5299,9 @@ public class TXJUnitTest {
txMgrImpl.begin();
this.region.get("key1");
this.region.localInvalidate("key1"); // should be a noop since it is already invalid
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.remove("key1"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
txMgrImpl.commit();
assertEquals(false, this.region.containsKey("key1"));
@@ -5283,9 +5310,9 @@ public class TXJUnitTest {
this.region.create("key1", null); // non-tx
txMgrImpl.begin();
this.region.localInvalidate("key1"); // should be a noop since it is already invalid
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.remove("key1"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(true, this.region.containsKey("key1"));
assertEquals(false, this.region.containsValueForKey("key1"));
txMgrImpl.commit();
@@ -5299,9 +5326,9 @@ public class TXJUnitTest {
fail("expected EntryNotFoundException");
} catch (EntryNotFoundException expected) {
}
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.create("key1", "value1"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(false, this.region.containsKey("key1"));
txMgrImpl.commit();
assertEquals(true, this.region.containsKey("key1"));
@@ -5315,9 +5342,9 @@ public class TXJUnitTest {
fail("expected EntryExistsException");
} catch (EntryExistsException expected) {
}
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.remove("key1"); // non-tx
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertEquals(true, this.region.containsKey("key1"));
txMgrImpl.commit();
assertEquals(false, this.region.containsKey("key1"));
@@ -5337,9 +5364,9 @@ public class TXJUnitTest {
// now try a put with a conflict and make sure it is detected
txMgrImpl.begin();
this.region.put("key1", "value1");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // do a non-tx put to force conflict
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
try {
txMgrImpl.commit();
fail("expected CommitConflictException");
@@ -5360,9 +5387,9 @@ public class TXJUnitTest {
this.region.put("key1", "value0");
txMgrImpl.begin();
this.region.put("key1", "value1");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // do a non-tx put to force conflict
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
try {
txMgrImpl.commit();
fail("expected CommitConflictException");
@@ -5381,9 +5408,9 @@ public class TXJUnitTest {
// now try a create with a conflict and make sure it is detected
txMgrImpl.begin();
this.region.create("key1", "value1");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // do a non-tx put to force conflict
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
try {
txMgrImpl.commit();
fail("expected CommitConflictException");
@@ -5404,9 +5431,9 @@ public class TXJUnitTest {
this.region.put("key1", "value0");
txMgrImpl.begin();
this.region.localInvalidate("key1");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // do a non-tx put to force conflict
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
try {
txMgrImpl.commit();
fail("expected CommitConflictException");
@@ -5427,9 +5454,9 @@ public class TXJUnitTest {
this.region.put("key1", "value0");
txMgrImpl.begin();
this.region.invalidate("key1");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // do a non-tx put to force conflict
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
try {
txMgrImpl.commit();
fail("expected CommitConflictException");
@@ -5441,9 +5468,9 @@ public class TXJUnitTest {
// check C + DD is a NOOP that still gets conflict if non-tx entry created */
this.txMgr.begin();
this.region.create("newKey", "valueTX");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.create("newKey", "valueNONTX");
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
this.region.destroy("newKey");
assertTrue(!this.region.containsKey("key1"));
try {
@@ -5457,9 +5484,9 @@ public class TXJUnitTest {
// check C + LD is a NOOP that still gets conflict if non-tx entry created */
this.txMgr.begin();
this.region.create("newKey", "valueTX");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.create("newKey", "valueNONTX");
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
this.region.localDestroy("newKey");
assertTrue(!this.region.containsKey("key1"));
try {
@@ -5481,9 +5508,9 @@ public class TXJUnitTest {
this.region.put("key1", "value0");
txMgrImpl.begin();
this.region.localDestroy("key1");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // do a non-tx put to force conflict
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
try {
txMgrImpl.commit();
fail("expected CommitConflictException");
@@ -5503,9 +5530,9 @@ public class TXJUnitTest {
this.region.put("key1", "value0");
txMgrImpl.begin();
this.region.destroy("key1");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.put("key1", "value2"); // do a non-tx put to force conflict
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
try {
txMgrImpl.commit();
fail("expected CommitConflictException");
@@ -5522,9 +5549,9 @@ public class TXJUnitTest {
this.region.localInvalidate("key1");
txMgrImpl.begin();
this.region.put("key1", "txVal1");
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
this.region.invalidate("key1");
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
txMgrImpl.commit();
assertEquals("txVal1", this.region.getEntry("key1").getValue());
this.region.destroy("key1");
@@ -5532,9 +5559,9 @@ public class TXJUnitTest {
// now try a put and a region destroy.
txMgrImpl.begin();
this.region.create("key1", "value1");
- TXStateProxy tis = txMgrImpl.internalSuspend();
+ TXStateProxy tis = txMgrImpl.pauseTransaction();
this.region.localDestroyRegion(); // non-tx
- txMgrImpl.internalResume(tis);
+ txMgrImpl.unpauseTransaction(tis);
try {
txMgrImpl.commit();
@@ -5800,7 +5827,7 @@ public class TXJUnitTest {
lruRegion.put("key" + i, new Long(i));
}
assertLRUEntries(lruRegion.entrySet(false), numToPut, "key", LRUENTRY_LONG);
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_INTEGER);
for (int i = 0; i < numToPut; ++i) {
@@ -5809,7 +5836,7 @@ public class TXJUnitTest {
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_INTEGER);
assertNull(lruRegion.get("non-tx key0"));
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
this.txMgr.commit();
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_LONG);
}
@@ -5841,7 +5868,7 @@ public class TXJUnitTest {
lruRegion.get("key" + i, new Integer(i));
}
assertLRUEntries(lruRegion.entrySet(false), numToPut, "key", LRUENTRY_STRING);
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
assertEquals(lruSize, lruRegion.entrySet(false).size());
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_NULL);
@@ -5851,7 +5878,7 @@ public class TXJUnitTest {
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_NULL);
assertNull(lruRegion.getEntry("non-tx key0"));
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
this.txMgr.commit();
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_STRING);
Iterator it = lruRegion.keySet().iterator();
@@ -5868,7 +5895,7 @@ public class TXJUnitTest {
// eviction, force TX eviction
{
final TXManagerImpl txMgrImpl = (TXManagerImpl) this.txMgr;
- TXStateProxy tx1, tx2;
+ TransactionId txId1, txId2;
numToPut = lruSize + 4;
assertEquals(0, lruRegion.entrySet(false).size());
// Create entries
@@ -5884,7 +5911,7 @@ public class TXJUnitTest {
lruRegion.put("key" + i, new Long(i));
}
assertLRUEntries(lruRegion.entrySet(false), numToPut, "key", LRUENTRY_LONG);
- tx1 = txMgrImpl.internalSuspend();
+ txId1 = txMgrImpl.suspend();
this.txMgr.begin();
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_INTEGER);
@@ -5893,7 +5920,7 @@ public class TXJUnitTest {
lruRegion.put("key" + i, new Double(i));
}
assertLRUEntries(lruRegion.entrySet(false), numToPut, "key", LRUENTRY_DOUBLE);
- tx2 = txMgrImpl.internalSuspend();
+ txId2 = txMgrImpl.suspend();
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_INTEGER);
@@ -5906,13 +5933,13 @@ public class TXJUnitTest {
assertNull(lruRegion.get("non-tx key0"));
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_INTEGER);
- txMgrImpl.internalResume(tx1);
+ txMgrImpl.resume(txId1);
assertLRUEntries(lruRegion.entrySet(false), numToPut, "key", LRUENTRY_LONG);
// Check to make sure no conflict was caused by non-TX put evictions
// This should remove all references for each committed entry
this.txMgr.commit();
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_LONG);
- txMgrImpl.internalResume(tx2);
+ txMgrImpl.resume(txId2);
assertLRUEntries(lruRegion.entrySet(false), numToPut, "key", LRUENTRY_DOUBLE);
this.txMgr.rollback();
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_LONG);
@@ -5946,7 +5973,7 @@ public class TXJUnitTest {
lruRegion.put("key" + i, new Long(i));
}
assertLRUEntries(lruRegion.entrySet(false), numToPut, "key", LRUENTRY_LONG);
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_INTEGER);
// Force the Non-Tx "put" to remove each attempt since region is full
@@ -5957,7 +5984,7 @@ public class TXJUnitTest {
assertNull(lruRegion.get("non-tx key0"));
assertLRUEntries(lruRegion.entrySet(false), lruSize, "key", LRUENTRY_INTEGER);
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertLRUEntries(lruRegion.entrySet(false), numToPut, "key", LRUENTRY_LONG);
// This should remove all references for each committed entry
this.txMgr.rollback();
@@ -5992,12 +6019,12 @@ public class TXJUnitTest {
lruRegion.put("key" + i, new Long(i));
}
assertLRUEntries(lruRegion.entrySet(false), numToPut, "key", LRUENTRY_LONG);
- tx = txMgrImpl.internalSuspend();
+ tx = txMgrImpl.pauseTransaction();
// Cause a conflict
lruRegion.put("key" + (numToPut - 1), new Integer(numToPut - 1));
- txMgrImpl.internalResume(tx);
+ txMgrImpl.unpauseTransaction(tx);
assertLRUEntries(lruRegion.entrySet(false), numToPut, "key", LRUENTRY_LONG);
// This should remove all references for each committed entry
try {
@@ -6102,12 +6129,12 @@ public class TXJUnitTest {
this.region.put("syncKey3", "syncVal3");
assertEquals("syncVal3", this.region.getEntry("syncKey3").getValue());
- TXStateProxy gfTx = gfTxMgrImpl.internalSuspend();
+ TXStateProxy gfTx = gfTxMgrImpl.pauseTransaction();
javax.transaction.Transaction jtaTx = jtaTxMgr.suspend();
assertNull(jtaTxMgr.getTransaction());
this.region.put("syncKey3", "syncVal4");
assertEquals("syncVal4", this.region.getEntry("syncKey3").getValue());
- gfTxMgrImpl.internalResume(gfTx);
+ gfTxMgrImpl.unpauseTransaction(gfTx);
try {
jtaTxMgr.resume(jtaTx);
} catch (Exception failure) {
@@ -6346,7 +6373,7 @@ public class TXJUnitTest {
assertNotNull(this.txMgr.getTransactionId());
{
TXManagerImpl gfTxMgrImpl = (TXManagerImpl) this.txMgr;
- TXStateProxy gfTx = gfTxMgrImpl.internalSuspend();
+ TXStateProxy gfTx = gfTxMgrImpl.pauseTransaction();
javax.transaction.TransactionManager jtaTxMgr = this.cache.getJTATransactionManager();
javax.transaction.Transaction jtaTx = jtaTxMgr.suspend();
@@ -6359,7 +6386,7 @@ public class TXJUnitTest {
} catch (Exception failure) {
fail("JTA resume failed");
}
- gfTxMgrImpl.internalResume(gfTx);
+ gfTxMgrImpl.unpauseTransaction(gfTx);
}
assertEquals("enlistVal", this.region.get("enlistKey"));
try {
@@ -6804,10 +6831,10 @@ public class TXJUnitTest {
ctm.begin();
pr.putAll(map);
r.putAll(map);
- TXStateProxy tx = ctm.internalSuspend();
+ TXStateProxy tx = ctm.pauseTransaction();
assertTrue(!pr.containsKey("stuff"));
assertTrue(!r.containsKey("stuff"));
- ctm.internalResume(tx);
+ ctm.unpauseTransaction(tx);
ctm.commit();
assertTrue(pr.containsKey("stuff"));
assertTrue(r.containsKey("stuff"));
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/MultiVMRegionTestCase.java b/geode-core/src/test/java/org/apache/geode/cache30/MultiVMRegionTestCase.java
index 79f1ee9..f014480 100644
--- a/geode-core/src/test/java/org/apache/geode/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/MultiVMRegionTestCase.java
@@ -6314,10 +6314,10 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
vm0.invoke(create);
{
- TXStateProxy tx = ((TXManagerImpl) txMgr).internalSuspend();
+ TXStateProxy tx = ((TXManagerImpl) txMgr).pauseTransaction();
assertTrue(rgn.containsKey("key"));
assertEquals("LV 1", rgn.getEntry("key").getValue());
- ((TXManagerImpl) txMgr).internalResume(tx);
+ ((TXManagerImpl) txMgr).unpauseTransaction(tx);
}
// make sure transactional view is still correct
assertEquals("txValue", rgn.getEntry("key").getValue());
@@ -6362,11 +6362,11 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
rgn.create("key3", "txValue3");
{
- TXStateProxy tx = ((TXManagerImpl) txMgr).internalSuspend();
+ TXStateProxy tx = ((TXManagerImpl) txMgr).pauseTransaction();
// do a get outside of the transaction to force a net load
Object v3 = rgn.get("key3");
assertEquals("LV 3", v3);
- ((TXManagerImpl) txMgr).internalResume(tx);
+ ((TXManagerImpl) txMgr).unpauseTransaction(tx);
}
// make sure transactional view is still correct
assertEquals("txValue3", rgn.getEntry("key3").getValue());
@@ -6420,11 +6420,11 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
rgn.put("key", "new txValue");
{
- TXStateProxy tx = ((TXManagerImpl) txMgr).internalSuspend();
+ TXStateProxy tx = ((TXManagerImpl) txMgr).pauseTransaction();
// do a get outside of the transaction to force a netsearch
assertEquals("txValue", rgn.get("key")); // does a netsearch
assertEquals("txValue", rgn.getEntry("key").getValue());
- ((TXManagerImpl) txMgr).internalResume(tx);
+ ((TXManagerImpl) txMgr).unpauseTransaction(tx);
}
// make sure transactional view is still correct
assertEquals("new txValue", rgn.getEntry("key").getValue());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
index c7ae750..035eb06 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
@@ -725,7 +725,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
}
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("suspending transaction");
if (!useJTA) {
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
if (prePopulateData) {
for (int i = 0; i < 5; i++) {
CustId custId = new CustId(i);
@@ -739,7 +739,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
assertNull(pr.get(new CustId(i)));
}
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("resuming transaction");
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
}
assertEquals("r sized should be " + MAX_ENTRIES + " but it is:" + r.size(), MAX_ENTRIES,
r.size());
@@ -838,9 +838,9 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
client.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNotNull(tx);
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
if (commit) {
mgr.commit();
} else {
@@ -1620,10 +1620,10 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
mgr.begin();
pr.put(custId, new Customer("name10", "address10"));
r.put(10, "value10");
- TXStateProxy txState = mgr.internalSuspend();
+ TXStateProxy txState = mgr.pauseTransaction();
assertNull(pr.get(custId));
assertNull(r.get(10));
- mgr.internalResume(txState);
+ mgr.unpauseTransaction(txState);
mgr.commit();
return null;
}
@@ -2369,12 +2369,12 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
assertEquals(new Customer("name0", "address0"), pr.get(new CustId(0)));
assertEquals(new Customer("name10", "address10"), pr.get(new CustId(10)));
assertEquals(new Customer("name10", "address10"), r.get(new CustId(10)));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertEquals(new Customer("oldname0", "oldaddress0"), pr.get(new CustId(0)));
assertEquals(new Customer("oldname1", "oldaddress1"), pr.get(new CustId(1)));
assertNull(pr.get(new CustId(10)));
assertNull(r.get(new CustId(10)));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
assertEquals(new Customer("name0", "address0"), pr.get(new CustId(0)));
assertEquals(new Customer("name1", "address1"), pr.get(new CustId(1)));
@@ -2435,13 +2435,14 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
mgr.begin();
pr.put(custId, new Customer("name10", "address10"));
r.put(10, "value10");
- final TXStateProxy txState = mgr.internalSuspend();
+
+ final TransactionId txId = mgr.suspend();
assertNull(pr.get(custId));
assertNull(r.get(10));
final CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread(new Runnable() {
public void run() {
- mgr.internalResume(txState);
+ mgr.resume(txId);
mgr.commit();
latch.countDown();
}
@@ -2916,7 +2917,8 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
Map<CustId, Customer> m = new HashMap<CustId, Customer>();
m.put(new CustId(2), new Customer("name2", "address2"));
r.putAll(m);
- TXStateProxyImpl tx = (TXStateProxyImpl) mgr.internalSuspend();
+ TXStateProxyImpl tx = (TXStateProxyImpl) mgr.getTXState();
+ TransactionId txId = mgr.suspend();
ClientTXStateStub txStub = (ClientTXStateStub) tx.getRealDeal(null, null);
txStub.setAfterLocalLocks(new Runnable() {
public void run() {
@@ -2928,7 +2930,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
}
}
});
- mgr.internalResume(tx);
+ mgr.resume(txId);
mgr.commit();
}
});
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
index 3c4908c..84d3e9d 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
@@ -553,9 +553,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNotNull(tx);
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
if (commit) {
mgr.commit();
} else {
@@ -660,9 +660,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
Customer s = (Customer) cust.get(new CustId(8));
assertEquals(new Customer("sup dawg", "add"), s);
assertTrue(cust.containsKey(new CustId(8)));
- TXStateProxy tx = ((TXManagerImpl) mgr).internalSuspend();
+ TXStateProxy tx = ((TXManagerImpl) mgr).pauseTransaction();
assertFalse(cust.containsKey(new CustId(8)));
- ((TXManagerImpl) mgr).internalResume(tx);
+ ((TXManagerImpl) mgr).unpauseTransaction(tx);
mgr.commit();
Customer s2 = (Customer) cust.get(new CustId(8));
Customer ex = new Customer("sup dawg", "add");
@@ -932,10 +932,10 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertNull(oldOrder);
assertNotNull(cust.get(newCustId));
assertNotNull(rr.get(newCustId));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNull(cust.get(newCustId));
assertNull(rr.get(newCustId));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
cust.put(oldCustId, new Customer("foo", "bar"));
rr.put(oldCustId, new Customer("foo", "bar"));
return mgr.getTransactionId();
@@ -978,9 +978,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
mgr.begin();
CustId conflictCust = new CustId(11);
cust.putIfAbsent(conflictCust, new Customer("name11", "address11"));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
cust.put(conflictCust, new Customer("foo", "bar"));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
try {
mgr.commit();
fail("expected exception not thrown");
@@ -1017,10 +1017,10 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertTrue(cust.remove(custId, customer));
assertFalse(ref.remove(custId, fakeCust));
assertTrue(ref.remove(custId, customer));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNotNull(cust.get(custId));
assertNotNull(ref.get(custId));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
return mgr.getTransactionId();
}
});
@@ -1058,9 +1058,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
Customer customer = new Customer("customer2", "address2");
getGemfireCache().getLoggerI18n().fine("SWAP:removeConflict");
assertTrue(cust.remove(conflictCust, customer));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
cust.put(conflictCust, new Customer("foo", "bar"));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
try {
mgr.commit();
fail("expected exception not thrown");
@@ -1093,10 +1093,10 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
Customer fakeCust = new Customer("foo2", "bar2");
cust.removeAll(Arrays.asList(custId1, custId2, custId20));
ref.removeAll(Arrays.asList(custId1, custId2, custId20));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNotNull(cust.get(custId1));
assertNotNull(ref.get(custId2));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
return mgr.getTransactionId();
}
});
@@ -1136,12 +1136,12 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
CustId custId4 = new CustId(4);
getGemfireCache().getLoggerI18n().fine("SWAP:removeConflict");
cust.removeAll(Arrays.asList(custId3, custId20, custId4));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
// cust.put(custId3, new Customer("foo", "bar"));
cust.put(custId20, new Customer("foo", "bar"));
assertNotNull(cust.get(custId20));
cust.put(custId4, new Customer("foo", "bar"));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
try {
mgr.commit();
fail("expected exception not thrown");
@@ -1156,9 +1156,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
mgr.begin();
getGemfireCache().getLoggerI18n().fine("SWAP:removeConflict");
cust.removeAll(Arrays.asList(custId2, custId3));
- tx = mgr.internalSuspend();
+ tx = mgr.pauseTransaction();
cust.put(custId2, new Customer("foo", "bar"));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
assertNotNull(cust.get(custId2));
assertNull(cust.get(custId3));
@@ -1283,10 +1283,10 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertTrue(cust.replace(custId, customer, updatedCust));
assertFalse(ref.replace(custId, fakeCust, updatedCust));
assertTrue(ref.replace(custId, customer, updatedCust));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertEquals(cust.get(custId), customer);
assertEquals(ref.get(custId), customer);
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
return mgr.getTransactionId();
}
});
@@ -1324,9 +1324,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
Customer customer = new Customer("customer2", "address2");
getGemfireCache().getLoggerI18n().fine("SWAP:removeConflict");
assertTrue(cust.replace(conflictCust, customer, new Customer("conflict", "conflict")));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
cust.put(conflictCust, new Customer("foo", "bar"));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
try {
mgr.commit();
fail("expected exception not thrown");
@@ -2205,11 +2205,11 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertEquals(6, i);
assertEquals(6, rr.keySet().size());
assertNotNull(rr.get(custId));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertEquals(getCustIdSet(5), rr.keySet());
assertEquals(5, rr.keySet().size());
assertNull(rr.get(custId));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
return null;
}
@@ -2255,11 +2255,11 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertEquals(6, i);
assertEquals(6, rr.values().size());
assertNotNull(rr.get(custId));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertEquals(getCustomerSet(5), rr.values());
assertEquals(5, rr.values().size());
assertNull(rr.get(custId));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
return null;
}
@@ -2305,11 +2305,11 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertEquals(6, i);
assertEquals(6, rr.entrySet().size());
assertNotNull(rr.get(custId));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
// assertIndexDetailsEquals(getCustIdSet(5), rr.entrySet());
assertEquals(5, rr.entrySet().size());
assertNull(rr.get(custId));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
return null;
}
@@ -2522,10 +2522,10 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
FunctionService.onMember(owner).execute(TXFunction.id).getResult();
break;
}
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
GemFireCacheImpl.getInstance().getLogger().warning("TX SUSPENDO:" + tx);
assertNull(custRegion.get(expectedCustId));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
return null;
}
});
@@ -2568,9 +2568,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
FunctionService.onMember(owner).execute(TXFunction.id).getResult();
break;
}
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
custRegion.put(expectedCustId, new Customer("Cust6", "updated6"));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
try {
mgr.commit();
fail("expected commit conflict not thrown");
@@ -2714,12 +2714,12 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
mgr.begin();
FunctionService.onRegion(custRegion).execute(TXFunction.id).getResult();
assertNotNull(mgr.getTXState());
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNull(mgr.getTXState());
getGemfireCache().getLogger().fine("SWAP:callingget");
assertNull("expected null but was:" + custRegion.get(expectedCustId),
custRegion.get(expectedCustId));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
assertEquals(expectedCustomer, custRegion.get(expectedCustId));
return null;
@@ -2744,9 +2744,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
context.getResultSender().lastResult(Boolean.TRUE);
}
}).getResult();
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertEquals(custRegion.get(expectedCustId), expectedCustomer);
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
assertNull(custRegion.get(expectedCustId));
return null;
@@ -2787,9 +2787,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
filter.add(expectedCustId);
FunctionService.onRegion(custRegion).withFilter(filter).execute(TXFunction.id).getResult();
assertEquals(expectedCustomer, custRegion.get(expectedCustId));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNull(custRegion.get(expectedCustId));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
return null;
}
});
@@ -2839,9 +2839,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
FunctionService.onMember(owner).execute(TXFunction.id).getResult();
assertEquals(expectedCustomer, pr.get(expectedCustId));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNull(pr.get(expectedCustId));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
return null;
}
});
@@ -2878,9 +2878,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
filter.add(keyOnDs);
FunctionService.onRegion(pr).withFilter(filter).execute(TXFunction.id).getResult();
assertEquals(expectedCustomer, pr.get(expectedCustId));
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNull(pr.get(expectedCustId));
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
return null;
}
};
@@ -3092,9 +3092,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNotNull(tx);
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
return null;
}
@@ -3105,9 +3105,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNotNull(tx);
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
return null;
}
@@ -3119,9 +3119,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNotNull(tx);
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
return null;
}
@@ -3163,9 +3163,9 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new SerializableCallable() {
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertNotNull(tx);
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
return null;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TransactionsWithDeltaDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TransactionsWithDeltaDUnitTest.java
index c3aa056..e153605 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TransactionsWithDeltaDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TransactionsWithDeltaDUnitTest.java
@@ -365,9 +365,9 @@ public class TransactionsWithDeltaDUnitTest extends JUnit4CacheTestCase {
LogWriterUtils.getLogWriter().info("SWAP:getfromtx:" + pr.get(cust1));
LogWriterUtils.getLogWriter().info("SWAP:doingCommit");
assertEquals("updatedName", pr.get(cust1).getName());
- TXStateProxy tx = mgr.internalSuspend();
+ TXStateProxy tx = mgr.pauseTransaction();
assertEquals("name1", pr.get(cust1).getName());
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
mgr.commit();
assertTrue(c.isToDeltaCalled());
assertEquals(c, pr.get(cust1));
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java
index 10e3282..bd023eb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java
@@ -25,6 +25,7 @@ import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.TransactionDataNotColocatedException;
import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
@@ -415,7 +416,7 @@ public class MyTransactionFunction implements Function {
TXManagerImpl mImp = (TXManagerImpl) mgr;
mImp.begin();
orderPR.put(vOrderId, vOrder);
- TXStateProxy txState = mImp.internalSuspend();
+ TXStateProxy txState = mImp.pauseTransaction();
Iterator it = txState.getRegions().iterator();
Assert.assertTrue(txState.getRegions().size() == 1,
"Expected 1 region; " + "found:" + txState.getRegions().size());
@@ -423,9 +424,10 @@ public class MyTransactionFunction implements Function {
Assert.assertTrue(lr instanceof BucketRegion);
TXRegionState txRegion = txState.readRegion(lr);
TXEntryState txEntry = txRegion.readEntry(txRegion.getEntryKeys().iterator().next());
- mImp.internalResume(txState);
+ mImp.unpauseTransaction(txState);
orderPR.put(vOrderId, new Order("foo"));
- txState = mImp.internalSuspend();
+ TransactionId txId = null;
+ txId = mImp.suspend();
// since both puts were on same key, verify that
// TxRegionState and TXEntryState are same
LocalRegion lr1 = (LocalRegion) txState.getRegions().iterator().next();
@@ -439,7 +441,7 @@ public class MyTransactionFunction implements Function {
orderPR.put(vOrderId, new Order("foobar"));
mImp.commit();
// now begin the first
- mImp.internalResume(txState);
+ mImp.resume(txId);
boolean caughtException = false;
try {
mImp.commit();
@@ -463,18 +465,18 @@ public class MyTransactionFunction implements Function {
mImp.begin();
custPR.put(custId, cust);
Assert.assertTrue(cust.equals(custPR.get(custId)));
- TXStateProxy txState = mImp.internalSuspend();
+ TXStateProxy txState = mImp.pauseTransaction();
Assert.assertTrue(custPR.get(custId) == null);
- mImp.internalResume(txState);
+ mImp.unpauseTransaction(txState);
mImp.commit();
// change value
mImp.begin();
Customer oldCust = (Customer) custPR.get(custId);
Assert.assertTrue(oldCust.equals(cust));
- txState = mImp.internalSuspend();
+ txState = mImp.pauseTransaction();
Customer newCust = new Customer("fooNew", "barNew");
custPR.put(custId, newCust);
- mImp.internalResume(txState);
+ mImp.unpauseTransaction(txState);
Assert.assertTrue(oldCust.equals(custPR.get(custId)));
mImp.commit();
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRSetOperationJTADUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRSetOperationJTADUnitTest.java
new file mode 100644
index 0000000..b5522e9
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRSetOperationJTADUnitTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.naming.Context;
+import javax.transaction.UserTransaction;
+
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+
+@Category(DistributedTest.class)
+@RunWith(JUnitParamsRunner.class)
+@SuppressWarnings("serial")
+public class PRSetOperationJTADUnitTest extends JUnit4CacheTestCase {
+
+ private static final Logger logger = LogService.getLogger();
+ private static final String REGION_NAME = "region1";
+
+ private Map<Long, String> testData;
+
+ private VM accessor = null;
+ private VM dataStore1 = null;
+ private VM dataStore2 = null;
+ private VM dataStore3 = null;
+
+ @Rule
+ public DistributedRestoreSystemProperties restoreSystemProperties =
+ new DistributedRestoreSystemProperties();
+
+ public PRSetOperationJTADUnitTest() {
+ super();
+ }
+
+ @Before
+ public void setup() {
+ testData = new HashMap<>();
+ testData.put(1L, "value1");
+ testData.put(2L, "value2");
+ testData.put(3L, "duplicateValue");
+ testData.put(4L, "duplicateValue");
+ }
+
+ @Override
+ public final void postSetUp() throws Exception {
+ disconnectAllFromDS(); // isolate this test from others to avoid periodic CacheExistsExceptions
+ Host host = Host.getHost(0);
+ dataStore1 = host.getVM(0);
+ dataStore2 = host.getVM(1);
+ dataStore3 = host.getVM(2);
+ accessor = host.getVM(3);
+ }
+
+ @Override
+ public final void preTearDownCacheTestCase() throws Exception {
+ Invoke.invokeInEveryVM(() -> verifyNoTxState());
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionKeysetWithJTA(boolean disableSetOpToStartJTA) throws Exception {
+ setupAndLoadRegion(disableSetOpToStartJTA);
+ verifyRegionKeysetWithJTA(disableSetOpToStartJTA);
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionValuesWithJTA(boolean disableSetOpToStartJTA) throws Exception {
+ setupAndLoadRegion(disableSetOpToStartJTA);
+ verifyRegionValuesWithJTA(disableSetOpToStartJTA);
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionEntriesWithJTA(boolean disableSetOpToStartJTA) throws Exception {
+ setupAndLoadRegion(disableSetOpToStartJTA);
+ verifyRegionEntriesWithJTA(disableSetOpToStartJTA);
+ }
+
+ private void setupAndLoadRegion(boolean disableSetOpToStartJTA) {
+ createRegion(disableSetOpToStartJTA);
+ dataStore1.invoke(() -> loadRegion());
+ }
+
+ private void createRegion(boolean disableSetOpToStartJTA) {
+ accessor.invoke(() -> createCache(disableSetOpToStartJTA));
+ dataStore1.invoke(() -> createCache(disableSetOpToStartJTA));
+ dataStore2.invoke(() -> createCache(disableSetOpToStartJTA));
+ dataStore3.invoke(() -> createCache(disableSetOpToStartJTA));
+
+ accessor.invoke(() -> createPR(true));
+ dataStore1.invoke(() -> createPR(false));
+ dataStore2.invoke(() -> createPR(false));
+ dataStore3.invoke(() -> createPR(false));
+ }
+
+ private void loadRegion() {
+ Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
+ testData.forEach((k, v) -> region.put(k, v));
+ }
+
+ private void verifyRegionKeysetWithJTA(boolean disableSetOpToStartJTA) {
+ accessor.invoke(() -> verifyRegionKeysetWithJTA(disableSetOpToStartJTA, true));
+ dataStore1.invoke(() -> verifyRegionKeysetWithJTA(disableSetOpToStartJTA, false));
+ dataStore2.invoke(() -> verifyRegionKeysetWithJTA(disableSetOpToStartJTA, false));
+ dataStore3.invoke(() -> verifyRegionKeysetWithJTA(disableSetOpToStartJTA, false));
+ }
+
+ private void verifyRegionValuesWithJTA(boolean disableSetOpToStartJTA) {
+ accessor.invoke(() -> verifyRegionValuesWithJTA(disableSetOpToStartJTA, true));
+ dataStore1.invoke(() -> verifyRegionValuesWithJTA(disableSetOpToStartJTA, false));
+ dataStore2.invoke(() -> verifyRegionValuesWithJTA(disableSetOpToStartJTA, false));
+ dataStore3.invoke(() -> verifyRegionValuesWithJTA(disableSetOpToStartJTA, false));
+ }
+
+ private void verifyRegionEntriesWithJTA(boolean disableSetOpToStartJTA) {
+ accessor.invoke(() -> verifyRegionEntriesWithJTA(disableSetOpToStartJTA, true));
+ dataStore1.invoke(() -> verifyRegionEntriesWithJTA(disableSetOpToStartJTA, false));
+ dataStore2.invoke(() -> verifyRegionEntriesWithJTA(disableSetOpToStartJTA, false));
+ dataStore3.invoke(() -> verifyRegionEntriesWithJTA(disableSetOpToStartJTA, false));
+ }
+
+ private void verifyRegionKeysetWithJTA(boolean disableSetOpToStartJTA, boolean isAccessor)
+ throws Exception {
+ Context ctx = basicGetCache().getJNDIContext();
+ UserTransaction userTX = startUserTransaction(ctx);
+ Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
+ try {
+ userTX.begin();
+ Collection<Long> set = region.keySet();
+ set.forEach((key) -> assertTrue(testData.keySet().contains(key)));
+ } finally {
+ validateTXManager(disableSetOpToStartJTA, isAccessor);
+ if (!disableSetOpToStartJTA && !isAccessor) {
+ userTX.rollback();
+ }
+ }
+ }
+
+ private void verifyRegionValuesWithJTA(boolean disableSetOpToStartJTA, boolean isAccessor)
+ throws Exception {
+ Context ctx = basicGetCache().getJNDIContext();
+ UserTransaction userTX = startUserTransaction(ctx);
+ Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
+ try {
+ userTX.begin();
+ Collection<String> set = region.values();
+ set.forEach((value) -> assertTrue(testData.values().contains(value)));
+ } finally {
+ validateTXManager(disableSetOpToStartJTA, isAccessor);
+ if (!disableSetOpToStartJTA && !isAccessor) {
+ userTX.rollback();
+ }
+ }
+ }
+
+ private void verifyRegionEntriesWithJTA(boolean disableSetOpToStartJTA, boolean isAccessor)
+ throws Exception {
+ Context ctx = basicGetCache().getJNDIContext();
+ UserTransaction userTX = startUserTransaction(ctx);
+ Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
+ try {
+ userTX.begin();
+ Collection<Map.Entry<Long, String>> set = region.entrySet();
+ set.forEach((entry) -> {
+ assertTrue(testData.values().contains(entry.getValue()));
+ assertTrue(testData.keySet().contains(entry.getKey()));
+ });
+ } finally {
+ validateTXManager(disableSetOpToStartJTA, isAccessor);
+ if (!disableSetOpToStartJTA && !isAccessor) {
+ userTX.rollback();
+ }
+ }
+ }
+
+ private void validateTXManager(boolean disableSetOpToStartTx, boolean isAccessor) {
+ if (disableSetOpToStartTx) {
+ assertNull(TXManagerImpl.getCurrentTXState());
+ } else {
+ assertNotNull(TXManagerImpl.getCurrentTXState());
+ if (!isAccessor) {
+ assertTrue(((TXStateProxyImpl) TXManagerImpl.getCurrentTXState()).hasRealDeal());
+ }
+ }
+ }
+
+ private UserTransaction startUserTransaction(Context ctx) throws Exception {
+ return (UserTransaction) ctx.lookup("java:/UserTransaction");
+ }
+
+ private void verifyNoTxState() {
+ TXManagerImpl mgr = getCache().getTxManager();
+ assertEquals(0, mgr.hostedTransactionsInProgressForTest());
+ }
+
+ final String restoreSetOperationTransactionBehavior = "restoreSetOperationTransactionBehavior";
+ final String RESTORE_SET_OPERATION_PROPERTY =
+ (System.currentTimeMillis() % 2 == 0 ? DistributionConfig.GEMFIRE_PREFIX : "geode.")
+ + restoreSetOperationTransactionBehavior;
+
+ private void createCache(boolean disableSetOpToStartJTA) {
+ if (disableSetOpToStartJTA) {
+ logger.info("setting system property {} to true ", RESTORE_SET_OPERATION_PROPERTY);
+ System.setProperty(RESTORE_SET_OPERATION_PROPERTY, "true");
+ }
+ getCache();
+ }
+
+ private void createPR(boolean isAccessor) {
+ basicGetCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setPartitionAttributes(new PartitionAttributesFactory<Long, String>().setTotalNumBuckets(3)
+ .setLocalMaxMemory(isAccessor ? 0 : 1).create())
+ .create(REGION_NAME);
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRSetOperationTXDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRSetOperationTXDUnitTest.java
new file mode 100644
index 0000000..791f3fe
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRSetOperationTXDUnitTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+
+@Category(DistributedTest.class)
+@RunWith(JUnitParamsRunner.class)
+@SuppressWarnings("serial")
+public class PRSetOperationTXDUnitTest extends JUnit4CacheTestCase {
+
+ private static final Logger logger = LogService.getLogger();
+ private static final String REGION_NAME = "region1";
+
+ private Map<Long, String> testData;
+
+ private VM accessor = null;
+ private VM dataStore1 = null;
+ private VM dataStore2 = null;
+ private VM dataStore3 = null;
+
+ @Rule
+ public DistributedRestoreSystemProperties restoreSystemProperties =
+ new DistributedRestoreSystemProperties();
+
+ public PRSetOperationTXDUnitTest() {
+ super();
+ }
+
+ @Before
+ public void setup() {
+ testData = new HashMap<>();
+ testData.put(1L, "value1");
+ testData.put(2L, "value2");
+ testData.put(3L, "duplicateValue");
+ testData.put(4L, "duplicateValue");
+ }
+
+ @Override
+ public final void postSetUp() throws Exception {
+ disconnectAllFromDS(); // isolate this test from others to avoid periodic CacheExistsExceptions
+ Host host = Host.getHost(0);
+ dataStore1 = host.getVM(0);
+ dataStore2 = host.getVM(1);
+ dataStore3 = host.getVM(2);
+ accessor = host.getVM(3);
+ }
+
+ @Override
+ public final void preTearDownCacheTestCase() throws Exception {
+ Invoke.invokeInEveryVM(() -> verifyNoTxState());
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionKeysetWithTx(boolean disableSetOpToStartTx) throws Exception {
+ setupAndLoadRegion(disableSetOpToStartTx);
+ verifyRegionKeysetWithTx(disableSetOpToStartTx);
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionValuesWithTx(boolean disableSetOpToStartTx) throws Exception {
+ setupAndLoadRegion(disableSetOpToStartTx);
+ verifyRegionValuesWithTx(disableSetOpToStartTx);
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionEntriesWithTx(boolean disableSetOpToStartTx) throws Exception {
+ setupAndLoadRegion(disableSetOpToStartTx);
+ verifyRegionEntriesWithTx(disableSetOpToStartTx);
+ }
+
+ private void setupAndLoadRegion(boolean disableSetOpToStartTx) {
+ createRegion(disableSetOpToStartTx);
+ dataStore1.invoke(() -> loadRegion());
+ }
+
+ private void createRegion(boolean disableSetOpToStartTx) {
+ accessor.invoke(() -> createCache(disableSetOpToStartTx));
+ dataStore1.invoke(() -> createCache(disableSetOpToStartTx));
+ dataStore2.invoke(() -> createCache(disableSetOpToStartTx));
+ dataStore3.invoke(() -> createCache(disableSetOpToStartTx));
+
+ accessor.invoke(() -> createPR(true));
+ dataStore1.invoke(() -> createPR(false));
+ dataStore2.invoke(() -> createPR(false));
+ dataStore3.invoke(() -> createPR(false));
+ }
+
+ private void loadRegion() {
+ Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
+ testData.forEach((k, v) -> region.put(k, v));
+ }
+
+ private void verifyRegionKeysetWithTx(boolean disableSetOpToStartTx) {
+ accessor.invoke(() -> verifyRegionKeysetWithTx(disableSetOpToStartTx, true));
+ dataStore1.invoke(() -> verifyRegionKeysetWithTx(disableSetOpToStartTx, false));
+ dataStore2.invoke(() -> verifyRegionKeysetWithTx(disableSetOpToStartTx, false));
+ dataStore3.invoke(() -> verifyRegionKeysetWithTx(disableSetOpToStartTx, false));
+ }
+
+ private void verifyRegionValuesWithTx(boolean disableSetOpToStartTx) {
+ accessor.invoke(() -> verifyRegionValuesWithTx(disableSetOpToStartTx, true));
+ dataStore1.invoke(() -> verifyRegionValuesWithTx(disableSetOpToStartTx, false));
+ dataStore2.invoke(() -> verifyRegionValuesWithTx(disableSetOpToStartTx, false));
+ dataStore3.invoke(() -> verifyRegionValuesWithTx(disableSetOpToStartTx, false));
+ }
+
+ private void verifyRegionEntriesWithTx(boolean disableSetOpToStartTx) {
+ accessor.invoke(() -> verifyRegionEntriesWithTx(disableSetOpToStartTx, true));
+ dataStore1.invoke(() -> verifyRegionEntriesWithTx(disableSetOpToStartTx, false));
+ dataStore2.invoke(() -> verifyRegionEntriesWithTx(disableSetOpToStartTx, false));
+ dataStore3.invoke(() -> verifyRegionEntriesWithTx(disableSetOpToStartTx, false));
+ }
+
+ private void verifyRegionKeysetWithTx(boolean disableSetOpToStartTx, boolean isAccessor) {
+ CacheTransactionManager txMgr = basicGetCache().getCacheTransactionManager();
+ Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
+ try {
+ txMgr.begin();
+ Collection<Long> set = region.keySet();
+ set.forEach((key) -> assertTrue(testData.keySet().contains(key)));
+ } finally {
+ validateTXManager(disableSetOpToStartTx, isAccessor);
+ txMgr.rollback();
+ }
+ }
+
+ private void verifyRegionValuesWithTx(boolean disableSetOpToStartTx, boolean isAccessor) {
+ CacheTransactionManager txMgr = basicGetCache().getCacheTransactionManager();
+ Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
+ try {
+ txMgr.begin();
+ Collection<String> set = region.values();
+ set.forEach((value) -> assertTrue(testData.values().contains(value)));
+ } finally {
+ validateTXManager(disableSetOpToStartTx, isAccessor);
+ txMgr.rollback();
+ }
+ }
+
+ private void verifyRegionEntriesWithTx(boolean disableSetOpToStartTx, boolean isAccessor) {
+ CacheTransactionManager txMgr = basicGetCache().getCacheTransactionManager();
+ Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
+ try {
+ txMgr.begin();
+ Collection<Map.Entry<Long, String>> set = region.entrySet();
+ set.forEach((entry) -> {
+ assertTrue(testData.values().contains(entry.getValue()));
+ assertTrue(testData.keySet().contains(entry.getKey()));
+ });
+ } finally {
+ validateTXManager(disableSetOpToStartTx, isAccessor);
+ txMgr.rollback();
+ }
+ }
+
+ private void validateTXManager(boolean disableSetOpToStartTx, boolean isAccessor) {
+ assertNotNull(TXManagerImpl.getCurrentTXState());
+ if (disableSetOpToStartTx || isAccessor) {
+ assertFalse(((TXStateProxyImpl) TXManagerImpl.getCurrentTXState()).hasRealDeal());
+ } else {
+ assertTrue(((TXStateProxyImpl) TXManagerImpl.getCurrentTXState()).hasRealDeal());
+ }
+ }
+
+ private void verifyNoTxState() {
+ TXManagerImpl mgr = getCache().getTxManager();
+ assertEquals(0, mgr.hostedTransactionsInProgressForTest());
+ }
+
+ final String restoreSetOperationTransactionBehavior = "restoreSetOperationTransactionBehavior";
+ final String RESTORE_SET_OPERATION_PROPERTY =
+ (System.currentTimeMillis() % 2 == 0 ? DistributionConfig.GEMFIRE_PREFIX : "geode.")
+ + restoreSetOperationTransactionBehavior;
+
+ private void createCache(boolean disableSetOpToStartTx) {
+ if (disableSetOpToStartTx) {
+ logger.info("setting system property {} to true ", RESTORE_SET_OPERATION_PROPERTY);
+ System.setProperty(RESTORE_SET_OPERATION_PROPERTY, "true");
+ }
+ getCache();
+ }
+
+ private void createPR(boolean isAccessor) {
+ basicGetCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setPartitionAttributes(new PartitionAttributesFactory<Long, String>().setTotalNumBuckets(3)
+ .setLocalMaxMemory(isAccessor ? 0 : 1).create())
+ .create(REGION_NAME);
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
index f888fef..a83a6b7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
@@ -85,9 +85,9 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
mgr.begin();
region.put(key, newValue);
- TXStateProxyImpl tx = (TXStateProxyImpl) mgr.internalSuspend();
+ TXStateProxyImpl tx = (TXStateProxyImpl) mgr.pauseTransaction();
ClientTXStateStub txStub = (ClientTXStateStub) tx.getRealDeal(null, null);
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
try {
txStub.beforeCompletion();
fail("expected to get CommitConflictException");
@@ -145,9 +145,9 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
mgr.begin();
region.put(key, newValue);
- TXStateProxyImpl tx = (TXStateProxyImpl) mgr.internalSuspend();
+ TXStateProxyImpl tx = (TXStateProxyImpl) mgr.pauseTransaction();
ClientTXStateStub txStub = (ClientTXStateStub) tx.getRealDeal(null, null);
- mgr.internalResume(tx);
+ mgr.unpauseTransaction(tx);
txStub.beforeCompletion();
if (withWait) {
getBlackboard().signalGate(first);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/SetOperationJTAJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/SetOperationJTAJUnitTest.java
new file mode 100644
index 0000000..2666ed5
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/jta/SetOperationJTAJUnitTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.jta;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.naming.Context;
+import javax.transaction.UserTransaction;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+
+@Category(IntegrationTest.class)
+@RunWith(JUnitParamsRunner.class)
+public class SetOperationJTAJUnitTest {
+ private static final Logger logger = LogService.getLogger();
+ private static final String REGION_NAME = "region1";
+
+ private Map<Long, String> testData;
+ private Cache cache;
+
+ @Rule
+ public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+ @Before
+ public void setup() {
+ testData = new HashMap<>();
+ testData.put(1L, "value1");
+ testData.put(2L, "value2");
+ testData.put(3L, "duplicateValue");
+ testData.put(4L, "duplicateValue");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ closeCache();
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionKeysetWithJTA(boolean disableSetOpToStartJTA) throws Exception {
+ Region<Long, String> region = setupAndLoadRegion(disableSetOpToStartJTA);
+ Context ctx = cache.getJNDIContext();
+ UserTransaction userTX = startUserTransaction(ctx);
+ try {
+ userTX.begin();
+ Collection<Long> set = region.keySet();
+ set.forEach((key) -> assertTrue(testData.keySet().contains(key)));
+ } finally {
+ validateTXManager(disableSetOpToStartJTA);
+ if (!disableSetOpToStartJTA) {
+ userTX.rollback();
+ }
+ }
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionValuesWithJTA(boolean disableSetOpToStartJTA) throws Exception {
+ Region<Long, String> region = setupAndLoadRegion(disableSetOpToStartJTA);
+ Context ctx = cache.getJNDIContext();
+ UserTransaction userTX = startUserTransaction(ctx);
+ try {
+ userTX.begin();
+ Collection<String> set = region.values();
+ set.forEach((value) -> assertTrue(testData.values().contains(value)));
+ } finally {
+ validateTXManager(disableSetOpToStartJTA);
+ if (!disableSetOpToStartJTA) {
+ userTX.rollback();
+ }
+ }
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void testRegionEntriesWithJTA(boolean disableSetOpToStartJTA) throws Exception {
+ Region<Long, String> region = setupAndLoadRegion(disableSetOpToStartJTA);
+ Context ctx = cache.getJNDIContext();
+ UserTransaction userTX = startUserTransaction(ctx);
+ try {
+ userTX.begin();
+ Collection<Map.Entry<Long, String>> set = region.entrySet();
+ set.forEach((entry) -> {
+ assertTrue(testData.values().contains(entry.getValue()));
+ assertTrue(testData.keySet().contains(entry.getKey()));
+ });
+ } finally {
+ validateTXManager(disableSetOpToStartJTA);
+ if (!disableSetOpToStartJTA) {
+ userTX.rollback();
+ }
+ }
+ }
+
+ private Region<Long, String> setupAndLoadRegion(boolean disableSetOpToStartTx) {
+ this.cache = createCache(disableSetOpToStartTx);
+ Region<Long, String> region = createRegion(cache);
+ testData.forEach((k, v) -> region.put(k, v));
+ return region;
+ }
+
+ private UserTransaction startUserTransaction(Context ctx) throws Exception {
+ return (UserTransaction) ctx.lookup("java:/UserTransaction");
+ }
+
+ private void validateTXManager(boolean disableSetOpToStartTx) {
+ if (disableSetOpToStartTx) {
+ assertNull(TXManagerImpl.getCurrentTXState());
+ } else {
+ assertNotNull(TXManagerImpl.getCurrentTXState());
+ assertTrue(((TXStateProxyImpl) TXManagerImpl.getCurrentTXState()).hasRealDeal());
+ }
+ }
+
+ protected Region<Long, String> createRegion(Cache cache) {
+ RegionFactory<Long, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+ Region<Long, String> r = rf.create(REGION_NAME);
+ return r;
+ }
+
+ final String restoreSetOperationTransactionBehavior = "restoreSetOperationTransactionBehavior";
+ final String RESTORE_SET_OPERATION_PROPERTY =
+ (System.currentTimeMillis() % 2 == 0 ? DistributionConfig.GEMFIRE_PREFIX : "geode.")
+ + restoreSetOperationTransactionBehavior;
+
+ private Cache createCache(boolean disableSetOpToStartJTA) {
+ if (disableSetOpToStartJTA) {
+ logger.info("setting system property {} to true ", RESTORE_SET_OPERATION_PROPERTY);
+ System.setProperty(RESTORE_SET_OPERATION_PROPERTY, "true");
+ }
+ CacheFactory cf = new CacheFactory().set(MCAST_PORT, "0");
+ this.cache = (GemFireCacheImpl) cf.create();
+ return this.cache;
+ }
+
+ protected void closeCache() {
+ if (this.cache != null) {
+ Cache c = this.cache;
+ this.cache = null;
+ c.close();
+ }
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/lang/SystemPropertyHelperJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/lang/SystemPropertyHelperJUnitTest.java
new file mode 100644
index 0000000..53f5655
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/lang/SystemPropertyHelperJUnitTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.lang;
+
+import static org.junit.Assert.*;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(UnitTest.class)
+public class SystemPropertyHelperJUnitTest {
+ String restoreSetOperationTransactionBehavior = "restoreSetOperationTransactionBehavior";
+
+ @Test
+ public void testRestoreSetOperationTransactionBehaviorDefaultToFalse() {
+ assertFalse(SystemPropertyHelper.restoreSetOperationTransactionBehavior());
+ }
+
+ @Test
+ public void testRestoreSetOperationTransactionBehaviorSystemProperty() {
+ String gemfirePrefixProperty = "gemfire." + restoreSetOperationTransactionBehavior;
+ System.setProperty(gemfirePrefixProperty, "true");
+ assertTrue(SystemPropertyHelper.restoreSetOperationTransactionBehavior());
+ System.clearProperty(gemfirePrefixProperty);
+
+ String geodePrefixProperty = "geode." + restoreSetOperationTransactionBehavior;
+ System.setProperty(geodePrefixProperty, "true");
+ assertTrue(SystemPropertyHelper.restoreSetOperationTransactionBehavior());
+ System.clearProperty(geodePrefixProperty);
+ }
+
+ @Test
+ public void testRestoreSetOperationTransactionBehaviorGeodePreference() {
+ String gemfirePrefixProperty = "gemfire." + restoreSetOperationTransactionBehavior;
+ String geodePrefixProperty = "geode." + restoreSetOperationTransactionBehavior;
+ System.setProperty(geodePrefixProperty, "false");
+ System.setProperty(gemfirePrefixProperty, "true");
+ assertFalse(SystemPropertyHelper.restoreSetOperationTransactionBehavior());
+ System.clearProperty(geodePrefixProperty);
+ System.clearProperty(gemfirePrefixProperty);
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].