You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2019/10/17 16:52:49 UTC
[geode] branch feature/GEODE-7273 updated: GEODE-7273: Able to
detect not colocated transaction
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-7273
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7273 by this push:
new 07402f7 GEODE-7273: Able to detect not colocated transaction
07402f7 is described below
commit 07402f794af108efe0db0f0db4e05038682d7bbe
Author: Eric Shu <es...@EricMacBookPro.local>
AuthorDate: Thu Oct 17 09:42:14 2019 -0700
GEODE-7273: Able to detect not colocated transaction
* Able to detect not colocated transaction if it is caused by the first operation on a
replicate region and then on a partitioned region.
* Make sure transaction host can detect this and throw TransactionDataNotColocatedException.
* Transaction host will throw appropriate TransactionException based on the operations
exectued.
* Remote non host will rely on the TransactionException thrown from the tx host.
---
...rverNotColocatedTransactionDistributedTest.java | 285 ++++++++++-
.../org/apache/geode/internal/cache/TXState.java | 6 +-
.../geode/internal/cache/TXStateProxyImpl.java | 202 ++++++--
.../internal/cache/tx/PartitionedTXRegionStub.java | 37 +-
.../geode/internal/cache/TXStateProxyImplTest.java | 84 +++
.../cache/tx/PartitionedTXRegionStubTest.java | 562 +++++++++++++++++++++
6 files changed, 1100 insertions(+), 76 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerNotColocatedTransactionDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerNotColocatedTransactionDistributedTest.java
index 906a47c..d55b31e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerNotColocatedTransactionDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerNotColocatedTransactionDistributedTest.java
@@ -34,8 +34,10 @@ import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.TransactionDataNotColocatedException;
+import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.PoolImpl;
@@ -50,6 +52,7 @@ public class ClientServerNotColocatedTransactionDistributedTest implements Seria
private String hostName;
private String uniqueName;
private String regionName;
+ private String replicateRegionName;
private VM server1;
private VM server2;
@@ -73,12 +76,13 @@ public class ClientServerNotColocatedTransactionDistributedTest implements Seria
hostName = getHostName();
uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
regionName = uniqueName + "_region";
+ replicateRegionName = uniqueName + "_replicateRegion";
}
@Test
- public void getEntryOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+ public void getOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
int initialPuts = 4;
- setupClientAndServerForMultipleTransactions(initialPuts);
+ setupClientAndServer(initialPuts);
TXManagerImpl txManager =
(TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
@@ -91,7 +95,7 @@ public class ClientServerNotColocatedTransactionDistributedTest implements Seria
}
}
- private void setupClientAndServerForMultipleTransactions(int initialPuts) {
+ private void setupClientAndServer(int initialPuts) {
int port1 = server1.invoke(() -> createServerRegion(2, false, 0));
int port2 = server2.invoke(() -> createServerRegion(2, false, 0));
server1.invoke(() -> doPuts(initialPuts));
@@ -150,7 +154,7 @@ public class ClientServerNotColocatedTransactionDistributedTest implements Seria
@Test
public void getAllOnMultipleNodesInATransactionThrowsTransactionDataNotColocatedException() {
int initialPuts = 4;
- setupClientAndServerForMultipleTransactions(initialPuts);
+ setupClientAndServer(initialPuts);
TXManagerImpl txManager =
(TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
@@ -174,7 +178,7 @@ public class ClientServerNotColocatedTransactionDistributedTest implements Seria
@Test
public void putOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
int initialPuts = 4;
- setupClientAndServerForMultipleTransactions(initialPuts);
+ setupClientAndServer(initialPuts);
TXManagerImpl txManager =
(TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
@@ -196,7 +200,7 @@ public class ClientServerNotColocatedTransactionDistributedTest implements Seria
@Test
public void putAllOnMultipleNodesInATransactionThrowsTransactionDataNotColocatedException() {
int initialPuts = 4;
- setupClientAndServerForMultipleTransactions(initialPuts);
+ setupClientAndServer(initialPuts);
TXManagerImpl txManager =
(TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
@@ -216,4 +220,273 @@ public class ClientServerNotColocatedTransactionDistributedTest implements Seria
}
region.putAll(map);
}
+
+ @Test
+ public void invalidateOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+ int initialPuts = 4;
+ setupClientAndServer(initialPuts);
+ TXManagerImpl txManager =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+ Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
+ txManager.begin();
+ try {
+ Throwable caughtException =
+ catchThrowable(() -> doTransactionalInvalidates(initialPuts, region));
+ assertThat(caughtException).isInstanceOf(TransactionDataNotColocatedException.class);
+ } finally {
+ txManager.commit();
+ }
+ }
+
+ private void doTransactionalInvalidates(int initialPuts, Region<Integer, Integer> region) {
+ for (int i = 1; i < initialPuts; i++) {
+ region.invalidate(i);
+ }
+ }
+
+ @Test
+ public void destroyOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+ int initialPuts = 4;
+ setupClientAndServer(initialPuts);
+ TXManagerImpl txManager =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+ Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
+ txManager.begin();
+ try {
+ Throwable caughtException =
+ catchThrowable(() -> doTransactionalDestroys(initialPuts, region));
+ assertThat(caughtException).isInstanceOf(TransactionDataNotColocatedException.class);
+ } finally {
+ txManager.commit();
+ }
+ }
+
+ private void doTransactionalDestroys(int initialPuts, Region<Integer, Integer> region) {
+ for (int i = 1; i < initialPuts; i++) {
+ region.destroy(i);
+ }
+ }
+
+ @Test
+ public void getEntryOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+ int initialPuts = 4;
+ setupClientAndServer(initialPuts);
+ TXManagerImpl txManager =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+ Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
+ txManager.begin();
+ try {
+ Throwable caughtException =
+ catchThrowable(() -> doTransactionalGetEntries(initialPuts, region));
+ assertThat(caughtException).isInstanceOf(TransactionDataNotColocatedException.class);
+ } finally {
+ txManager.commit();
+ }
+ }
+
+ private void doTransactionalGetEntries(int initialPuts, Region<Integer, Integer> region) {
+ for (int i = 1; i < initialPuts; i++) {
+ region.getEntry(i);
+ }
+ }
+
+ @Test
+ public void containKeyOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+ int initialPuts = 4;
+ setupClientAndServer(initialPuts);
+ TXManagerImpl txManager =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+ Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
+ txManager.begin();
+ try {
+ Throwable caughtException =
+ catchThrowable(() -> doTransactionalContainsKey(initialPuts, region));
+ assertThat(caughtException).isInstanceOf(TransactionDataNotColocatedException.class);
+ } finally {
+ txManager.commit();
+ }
+ }
+
+ private void doTransactionalContainsKey(int initialPuts, Region<Integer, Integer> region) {
+ for (int i = 1; i < initialPuts; i++) {
+ region.containsKey(i);
+ }
+ }
+
+ @Test
+ public void containsValueForKeyOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+ int initialPuts = 4;
+ setupClientAndServer(initialPuts);
+ TXManagerImpl txManager =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+ Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
+ txManager.begin();
+ try {
+ Throwable caughtException =
+ catchThrowable(() -> doTransactionalContainsValueForKey(initialPuts, region));
+ assertThat(caughtException).isInstanceOf(TransactionDataNotColocatedException.class);
+ } finally {
+ txManager.commit();
+ }
+ }
+
+ private void doTransactionalContainsValueForKey(int initialPuts,
+ Region<Integer, Integer> region) {
+ for (int i = 1; i < initialPuts; i++) {
+ region.containsValueForKey(i);
+ }
+ }
+
+ @Test
+ public void getOnReplicateRegionThenPartitionedRegionInATransactionThrowsTransactionDataNotColocatedException() {
+ int initialPuts = 4;
+ setupClientAndServerWithTwoRegions(initialPuts);
+ TXManagerImpl txManager =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.begin();
+ try {
+ // do first operation on replicate region
+ Region<Integer, Integer> replicateRegion =
+ clientCacheRule.getClientCache().getRegion(replicateRegionName);
+ replicateRegion.get(1);
+ Region<Integer, Integer> partitionedRegion =
+ clientCacheRule.getClientCache().getRegion(regionName);
+ try {
+ partitionedRegion.get(1);
+ } catch (TransactionException exception) {
+ assertThat(exception).isInstanceOf(TransactionDataNotColocatedException.class);
+ }
+ } finally {
+ txManager.commit();
+ }
+ }
+
+ private void doPutsInRegions(int numOfEntries) {
+ for (int i = 0; i <= numOfEntries; i++) {
+ cacheRule.getCache().getRegion(regionName).put(i, i);
+ cacheRule.getCache().getRegion(replicateRegionName).put(i, i);
+ }
+ }
+
+ private void setupClientAndServerWithTwoRegions(int initialPuts) {
+ int port1 = server1.invoke(() -> createServerRegion(2, false, 0));
+ server1.invoke(() -> createServerReplicateRegion());
+ int port2 = server2.invoke(() -> createServerRegion(2, false, 0));
+ server2.invoke(() -> createServerReplicateRegion());
+ server1.invoke(() -> doPutsInRegions(initialPuts));
+
+ createClientRegion(port1, port2);
+ createClientRegion(replicateRegionName);
+ }
+
+ private void createServerReplicateRegion() {
+ cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE)
+ .create(replicateRegionName);
+ }
+
+ private void createClientRegion(String regionName) {
+ Pool pool = clientCacheRule.getClientCache().getDefaultPool();
+ clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL)
+ .setPoolName(pool.getName())
+ .create(regionName);
+ }
+
+ @Test
+ public void putOnReplicateRegionThenPartitionedRegionInATransactionThrowsTransactionDataNotColocatedException() {
+ int initialPuts = 4;
+ setupClientAndServerWithTwoRegions(initialPuts);
+ TXManagerImpl txManager =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.begin();
+ try {
+ // do first operation on replicate region
+ Region<Integer, Integer> replicateRegion =
+ clientCacheRule.getClientCache().getRegion(replicateRegionName);
+ replicateRegion.put(1, 5);
+ Region<Integer, Integer> partitionedRegion =
+ clientCacheRule.getClientCache().getRegion(regionName);
+ try {
+ partitionedRegion.put(1, 6);
+ } catch (TransactionException exception) {
+ assertThat(exception).isInstanceOf(TransactionDataNotColocatedException.class);
+ }
+ } finally {
+ txManager.commit();
+ }
+ }
+
+ @Test
+ public void putAllOnReplicateRegionThenPartitionedRegionInATransactionThrowsTransactionDataNotColocatedException() {
+ int initialPuts = 4;
+ setupClientAndServerWithTwoRegions(initialPuts);
+ TXManagerImpl txManager =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.begin();
+ try {
+ // do first operation on replicate region
+ Region<Integer, Integer> replicateRegion =
+ clientCacheRule.getClientCache().getRegion(replicateRegionName);
+ HashMap<Integer, Integer> map = new HashMap();
+ map.put(1, 5);
+ replicateRegion.putAll(map);
+ Region<Integer, Integer> partitionedRegion =
+ clientCacheRule.getClientCache().getRegion(regionName);
+
+ try {
+ partitionedRegion.putAll(map);
+ } catch (TransactionException exception) {
+ assertThat(exception).isInstanceOf(TransactionDataNotColocatedException.class);
+ }
+ } finally {
+ txManager.commit();
+ }
+ }
+
+ @Test
+ public void invalidateOnReplicateRegionThenPartitionedRegionInATransactionThrowsTransactionDataNotColocatedException() {
+ int initialPuts = 4;
+ setupClientAndServerWithTwoRegions(initialPuts);
+ TXManagerImpl txManager =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.begin();
+ try {
+ // do first operation on replicate region
+ Region<Integer, Integer> replicateRegion =
+ clientCacheRule.getClientCache().getRegion(replicateRegionName);
+ replicateRegion.invalidate(1);
+ Region<Integer, Integer> partitionedRegion =
+ clientCacheRule.getClientCache().getRegion(regionName);
+ try {
+ partitionedRegion.invalidate(1);
+ } catch (TransactionException exception) {
+ assertThat(exception).isInstanceOf(TransactionDataNotColocatedException.class);
+ }
+ } finally {
+ txManager.commit();
+ }
+ }
+
+ @Test
+ public void destroyOnReplicateRegionThenPartitionedRegionInATransactionThrowsTransactionDataNotColocatedException() {
+ int initialPuts = 4;
+ setupClientAndServerWithTwoRegions(initialPuts);
+ TXManagerImpl txManager =
+ (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.begin();
+ try {
+ // do first operation on replicate region
+ Region<Integer, Integer> replicateRegion =
+ clientCacheRule.getClientCache().getRegion(replicateRegionName);
+ replicateRegion.destroy(1);
+ Region<Integer, Integer> partitionedRegion =
+ clientCacheRule.getClientCache().getRegion(regionName);
+ try {
+ partitionedRegion.destroy(1);
+ } catch (TransactionException exception) {
+ assertThat(exception).isInstanceOf(TransactionDataNotColocatedException.class);
+ }
+ } finally {
+ txManager.commit();
+ }
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index ff68dc2..c6281c9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -1599,7 +1599,7 @@ public class TXState implements TXStateInterface {
public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
boolean returnTombstones, boolean retainResult, boolean createIfAbsent) {
- TXEntryState tx = proxy.txReadEntry(keyInfo, localRegion, true, createIfAbsent);
+ TXEntryState tx = txReadEntry(keyInfo, localRegion, true, createIfAbsent);
if (tx != null) {
Object v = tx.getValue(keyInfo, localRegion, preferCD);
if (!disableCopyOnRead) {
@@ -2152,4 +2152,8 @@ public class TXState implements TXStateInterface {
boolean isClosed() {
return closed;
}
+
+ public boolean hasPerformedAnyOperation() {
+ return regions.size() != 0;
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index f9712df..0ec139c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -70,6 +70,8 @@ public class TXStateProxyImpl implements TXStateProxy {
*/
private Map<Integer, Boolean> buckets = new HashMap<Integer, Boolean>();
+ private boolean firstOperationOnPartitionedRegion = false;
+
protected volatile TXStateInterface realDeal;
protected boolean inProgress = true;
@@ -164,6 +166,11 @@ public class TXStateProxyImpl implements TXStateProxy {
logger.debug("Built a new TXState: {} me:{}", this.realDeal, this.txMgr.getDM().getId());
}
}
+ if (isRealDealLocal() && !((TXState) realDeal).hasPerformedAnyOperation()) {
+ if (r != null && (r instanceof PartitionedRegion || r.isUsedForPartitionedRegionBucket())) {
+ firstOperationOnPartitionedRegion = true;
+ }
+ }
return this.realDeal;
}
@@ -237,7 +244,7 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- private TransactionException getTransactionException(KeyInfo keyInfo, GemFireException e) {
+ TransactionException getTransactionException(KeyInfo keyInfo, GemFireException e) {
if (isRealDealLocal() && !buckets.isEmpty() && !buckets.containsKey(keyInfo.getBucketId())) {
TransactionException ex = new TransactionDataNotColocatedException(
String.format("Key %s is not colocated with transaction",
@@ -248,6 +255,14 @@ public class TXStateProxyImpl implements TXStateProxy {
Throwable ex = e;
while (ex != null) {
if (ex instanceof PrimaryBucketException) {
+ if (isRealDealLocal() && !firstOperationOnPartitionedRegion) {
+ return new TransactionDataNotColocatedException(
+ String.format(
+ "Key %s is not colocated with transaction. First operation in a transaction "
+ + "should be on a partitioned region when there are operations on both "
+ + "partitioned regions and replicate regions.",
+ keyInfo.getKey()));
+ }
return new TransactionDataRebalancedException(
"Transactional data moved, due to rebalancing.");
}
@@ -263,12 +278,17 @@ public class TXStateProxyImpl implements TXStateProxy {
boolean retVal = getRealDeal(keyInfo, region).containsValueForKey(keyInfo, region);
trackBucketForTx(keyInfo);
return retVal;
- } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
- throw getTransactionException(keyInfo, re);
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(keyInfo, transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(keyInfo, primaryBucketException);
}
}
- private void trackBucketForTx(KeyInfo keyInfo) {
+ void trackBucketForTx(KeyInfo keyInfo) {
if (keyInfo.getBucketId() >= 0) {
if (logger.isDebugEnabled()) {
logger.debug("adding bucket:{} for tx:{}", keyInfo.getBucketId(), getTransactionId());
@@ -287,8 +307,13 @@ public class TXStateProxyImpl implements TXStateProxy {
getRealDeal(event.getKeyInfo(), event.getRegion()).destroyExistingEntry(event, cacheWrite,
expectedOldValue);
trackBucketForTx(event.getKeyInfo());
- } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
- throw getTransactionException(event.getKeyInfo(), re);
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(event.getKeyInfo(), transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(event.getKeyInfo(), primaryBucketException);
}
}
@@ -312,14 +337,24 @@ public class TXStateProxyImpl implements TXStateProxy {
public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
boolean returnTombstones, boolean retainResult, boolean createIfAbsent) {
- Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion,
- updateStats, disableCopyOnRead, preferCD, null, false, retainResult, createIfAbsent);
- if (val != null) {
- // fixes bug 51057: TXStateStub on client always returns null, so do not increment
- // the operation count it will be incremented in findObject()
- this.operationCount++;
+ try {
+ Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion,
+ updateStats, disableCopyOnRead, preferCD, null, false, retainResult, createIfAbsent);
+ trackBucketForTx(keyInfo);
+ if (val != null) {
+ // fixes bug 51057: TXStateStub on client always returns null, so do not increment
+ // the operation count it will be incremented in findObject()
+ this.operationCount++;
+ }
+ return val;
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(keyInfo, transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(keyInfo, primaryBucketException);
}
- return val;
}
@Override
@@ -329,8 +364,13 @@ public class TXStateProxyImpl implements TXStateProxy {
Entry retVal = getRealDeal(keyInfo, region).getEntry(keyInfo, region, allowTombstones);
trackBucketForTx(keyInfo);
return retVal;
- } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
- throw getTransactionException(keyInfo, re);
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(keyInfo, transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(keyInfo, primaryBucketException);
}
}
@@ -365,8 +405,13 @@ public class TXStateProxyImpl implements TXStateProxy {
getRealDeal(event.getKeyInfo(), event.getRegion()).invalidateExistingEntry(event,
invokeCallbacks, forceNewEntry);
trackBucketForTx(event.getKeyInfo());
- } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
- throw getTransactionException(event.getKeyInfo(), re);
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(event.getKeyInfo(), transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(event.getKeyInfo(), primaryBucketException);
}
}
@@ -422,8 +467,13 @@ public class TXStateProxyImpl implements TXStateProxy {
.txPutEntry(event, ifNew, requireOldValue, checkResources, expectedOldValue);
trackBucketForTx(event.getKeyInfo());
return retVal;
- } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
- throw getTransactionException(event.getKeyInfo(), re);
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(event.getKeyInfo(), transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(event.getKeyInfo(), primaryBucketException);
}
}
@@ -436,8 +486,13 @@ public class TXStateProxyImpl implements TXStateProxy {
rememberRead, createTxEntryIfAbsent);
trackBucketForTx(keyInfo);
return retVal;
- } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
- throw getTransactionException(keyInfo, re);
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(keyInfo, transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(keyInfo, primaryBucketException);
}
}
@@ -485,8 +540,13 @@ public class TXStateProxyImpl implements TXStateProxy {
boolean retVal = getRealDeal(keyInfo, localRegion).containsKey(keyInfo, localRegion);
trackBucketForTx(keyInfo);
return retVal;
- } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
- throw getTransactionException(keyInfo, re);
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(keyInfo, transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(keyInfo, primaryBucketException);
}
}
@@ -531,8 +591,13 @@ public class TXStateProxyImpl implements TXStateProxy {
disableCopyOnRead, preferCD, requestingClient, clientEvent, false);
trackBucketForTx(key);
return retVal;
- } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
- throw getTransactionException(key, re);
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(key, transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(key, primaryBucketException);
}
}
@@ -627,8 +692,13 @@ public class TXStateProxyImpl implements TXStateProxy {
ifOld, expectedOldValue, requireOldValue, lastModified, overwriteDestroyed);
trackBucketForTx(event.getKeyInfo());
return retVal;
- } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
- throw getTransactionException(event.getKeyInfo(), re);
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(event.getKeyInfo(), transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(event.getKeyInfo(), primaryBucketException);
}
}
@@ -647,8 +717,20 @@ public class TXStateProxyImpl implements TXStateProxy {
ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
boolean returnTombstones) throws DataLocationException {
this.operationCount++;
- return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry,
- requestingClient, clientEvent, returnTombstones);
+ try {
+ Object retVal =
+ getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry,
+ requestingClient, clientEvent, returnTombstones);
+ trackBucketForTx(key);
+ return retVal;
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(key, transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(key, primaryBucketException);
+ }
}
@Override
@@ -658,8 +740,19 @@ public class TXStateProxyImpl implements TXStateProxy {
this.operationCount++;
TXStateInterface tx = getRealDeal(event.getKeyInfo(), event.getRegion());
assert (tx instanceof TXState) : tx.getClass().getSimpleName();
- return tx.putEntryOnRemote(event, ifNew, ifOld, expectedOldValue, requireOldValue, lastModified,
- overwriteDestroyed);
+ try {
+ boolean retVal = tx.putEntryOnRemote(event, ifNew, ifOld, expectedOldValue, requireOldValue,
+ lastModified, overwriteDestroyed);
+ trackBucketForTx(event.getKeyInfo());
+ return retVal;
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(event.getKeyInfo(), transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(event.getKeyInfo(), primaryBucketException);
+ }
}
@Override
@@ -673,7 +766,17 @@ public class TXStateProxyImpl implements TXStateProxy {
this.operationCount++;
TXStateInterface tx = getRealDeal(event.getKeyInfo(), event.getRegion());
assert (tx instanceof TXState);
- tx.destroyOnRemote(event, cacheWrite, expectedOldValue);
+ try {
+ tx.destroyOnRemote(event, cacheWrite, expectedOldValue);
+ trackBucketForTx(event.getKeyInfo());
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(event.getKeyInfo(), transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(event.getKeyInfo(), primaryBucketException);
+ }
}
@Override
@@ -682,7 +785,17 @@ public class TXStateProxyImpl implements TXStateProxy {
this.operationCount++;
TXStateInterface tx = getRealDeal(event.getKeyInfo(), event.getRegion());
assert (tx instanceof TXState);
- tx.invalidateOnRemote(event, invokeCallbacks, forceNewEntry);
+ try {
+ tx.invalidateOnRemote(event, invokeCallbacks, forceNewEntry);
+ trackBucketForTx(event.getKeyInfo());
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(event.getKeyInfo(), transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(event.getKeyInfo(), primaryBucketException);
+ }
}
@Override
@@ -728,7 +841,16 @@ public class TXStateProxyImpl implements TXStateProxy {
this.operationCount++;
TXStateInterface tx = getRealDeal(keyInfo, localRegion);
assert (tx instanceof TXState);
- return tx.getEntryOnRemote(keyInfo, localRegion, allowTombstones);
+ try {
+ return tx.getEntryOnRemote(keyInfo, localRegion, allowTombstones);
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(keyInfo, transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(keyInfo, primaryBucketException);
+ }
}
public void forceLocalBootstrap() {
@@ -893,8 +1015,13 @@ public class TXStateProxyImpl implements TXStateProxy {
Entry retVal = getRealDeal(keyInfo, region).accessEntry(keyInfo, region);
trackBucketForTx(keyInfo);
return retVal;
- } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
- throw getTransactionException(keyInfo, re);
+ } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+ if (isRealDealLocal()) {
+ throw getTransactionException(keyInfo, transactionDataRebalancedException);
+ }
+ throw transactionDataRebalancedException;
+ } catch (PrimaryBucketException primaryBucketException) {
+ throw getTransactionException(keyInfo, primaryBucketException);
}
}
@@ -1004,4 +1131,7 @@ public class TXStateProxyImpl implements TXStateProxy {
return onBehalfOfClientMember;
}
+ void setFirstOperationOnPartitionedRegion(boolean firstOperationOnPartitionedRegion) {
+ this.firstOperationOnPartitionedRegion = firstOperationOnPartitionedRegion;
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
index 5e38c08..cf13e64 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
@@ -77,10 +77,6 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
try {
pr.destroyRemotely(state.getTarget(), event.getKeyInfo().getBucketId(), event,
expectedOldValue);
- } catch (TransactionException e) {
- RuntimeException re = getTransactionException(event.getKeyInfo(), e);
- re.initCause(e.getCause());
- throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e);
@@ -104,7 +100,7 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
}
- private RuntimeException getTransactionException(KeyInfo keyInfo, Throwable cause) {
+ RuntimeException getTransactionException(KeyInfo keyInfo, Throwable cause) {
region.getCancelCriterion().checkCancelInProgress(cause); // fixes bug 44567
Throwable ex = cause;
while (ex != null) {
@@ -151,7 +147,7 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
/**
* wait to retry after getting a ForceReattemptException
*/
- private void waitToRetry() {
+ void waitToRetry() {
// this is what PR operations do. The 2000ms is not used
(new RetryTimeKeeper(2000)).waitForBucketsRecovery();
}
@@ -167,10 +163,6 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
return e;
} catch (EntryNotFoundException enfe) {
return null;
- } catch (TransactionException e) {
- RuntimeException re = getTransactionException(keyInfo, e);
- re.initCause(e.getCause());
- throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
@@ -193,7 +185,7 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
}
- private void trackBucketForTx(KeyInfo keyInfo) {
+ void trackBucketForTx(KeyInfo keyInfo) {
if (region.getCache().getLogger().fineEnabled()) {
region.getCache().getLogger()
.fine("adding bucket:" + keyInfo.getBucketId() + " for tx:" + state.getTransactionId());
@@ -210,10 +202,6 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
PartitionedRegion pr = (PartitionedRegion) event.getRegion();
try {
pr.invalidateRemotely(state.getTarget(), event.getKeyInfo().getBucketId(), event);
- } catch (TransactionException e) {
- RuntimeException re = getTransactionException(event.getKeyInfo(), e);
- re.initCause(e.getCause());
- throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e);
@@ -244,10 +232,6 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
keyInfo.getBucketId(), keyInfo.getKey());
trackBucketForTx(keyInfo);
return retVal;
- } catch (TransactionException e) {
- RuntimeException re = getTransactionException(keyInfo, e);
- re.initCause(e.getCause());
- throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
@@ -272,7 +256,7 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
/**
* @return true if the cause of the FRE is a BucketNotFoundException
*/
- private boolean isBucketNotFoundException(ForceReattemptException e) {
+ boolean isBucketNotFoundException(ForceReattemptException e) {
ForceReattemptException fre = e;
while (fre.getCause() != null && fre.getCause() instanceof ForceReattemptException) {
fre = (ForceReattemptException) fre.getCause();
@@ -288,10 +272,6 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
(InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(), keyInfo.getKey());
trackBucketForTx(keyInfo);
return retVal;
- } catch (TransactionException e) {
- RuntimeException re = getTransactionException(keyInfo, e);
- re.initCause(e.getCause());
- throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
@@ -312,7 +292,6 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
}
}
-
@Override
public Object findObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks,
Object value, boolean peferCD, ClientProxyMembershipID requestingClient,
@@ -324,10 +303,6 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
retVal =
region.getRemotely((InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(),
key, callbackArgument, peferCD, requestingClient, clientEvent, false);
- } catch (TransactionException e) {
- RuntimeException re = getTransactionException(keyInfo, e);
- re.initCause(e.getCause());
- throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e);
@@ -369,10 +344,6 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
try {
retVal =
pr.putRemotely(state.getTarget(), event, ifNew, ifOld, expectedOldValue, requireOldValue);
- } catch (TransactionException e) {
- RuntimeException re = getTransactionException(event.getKeyInfo(), e);
- re.initCause(e.getCause());
- throw re;
} catch (PrimaryBucketException e) {
RuntimeException re = getTransactionException(event.getKeyInfo(), e);
re.initCause(e);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
index 42fe93f..d7658f1 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
@@ -24,6 +24,10 @@ import static org.mockito.Mockito.when;
import org.junit.Before;
import org.junit.Test;
+import org.apache.geode.GemFireException;
+import org.apache.geode.cache.TransactionDataNotColocatedException;
+import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -157,4 +161,84 @@ public class TXStateProxyImplTest {
tx.setTarget(remoteMember);
}
+
+ @Test
+ public void txHostGetTransactionExceptionReturnsTransactionDataNotColocatedExceptionIfKeyNotInBuckets() {
+ TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+ tx.setLocalTXState(new TXState(tx, true, disabledClock()));
+ KeyInfo keyInfo1 = mock(KeyInfo.class);
+ when(keyInfo1.getBucketId()).thenReturn(1);
+ KeyInfo keyInfo2 = mock(KeyInfo.class);
+ when(keyInfo2.getBucketId()).thenReturn(2);
+ tx.trackBucketForTx(keyInfo1);
+
+ TransactionException transactionException =
+ tx.getTransactionException(keyInfo2, new PrimaryBucketException());
+
+ assertThat(transactionException).isInstanceOf(TransactionDataNotColocatedException.class);
+ }
+
+ @Test
+ public void txHostGetTransactionExceptionReturnsTransactionDataNotColocatedExceptionIfFirstOperationOnReplicate() {
+ TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+ tx.setLocalTXState(new TXState(tx, true, disabledClock()));
+ KeyInfo keyInfo = mock(KeyInfo.class);
+
+ TransactionException transactionException =
+ tx.getTransactionException(keyInfo, new PrimaryBucketException());
+
+ assertThat(transactionException).isInstanceOf(TransactionDataNotColocatedException.class);
+ }
+
+ @Test
+ public void txHostGetTransactionExceptionReturnsTransactionDataRebalancedExceptionIfFirstOperationOnPartitioned() {
+ TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+ tx.setLocalTXState(new TXState(tx, true, disabledClock()));
+ KeyInfo keyInfo = mock(KeyInfo.class);
+ GemFireException exception = mock(GemFireException.class);
+ when(exception.getCause()).thenReturn(new PrimaryBucketException());
+ tx.setFirstOperationOnPartitionedRegion(true);
+
+ TransactionException transactionException = tx.getTransactionException(keyInfo, exception);
+
+ assertThat(transactionException).isInstanceOf(TransactionDataRebalancedException.class);
+ }
+
+ @Test
+ public void txHostGetTransactionExceptionReturnsSameTransactionExceptionIfNotCausedByPrimaryBucketException() {
+ TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+ tx.setLocalTXState(new TXState(tx, true, disabledClock()));
+ TransactionException exception = mock(TransactionException.class);
+ KeyInfo keyInfo = mock(KeyInfo.class);
+
+ TransactionException transactionException = tx.getTransactionException(keyInfo, exception);
+
+ assertThat(transactionException).isSameAs(exception);
+ }
+
+ @Test
+ public void txStubGetTransactionExceptionReturnsTransactionDataRebalancedExceptionIfCausedByPrimaryBucketException() {
+ TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+ DistributedMember target = mock(InternalDistributedMember.class);
+ tx.setLocalTXState(new PeerTXStateStub(tx, target, null));
+ KeyInfo keyInfo = mock(KeyInfo.class);
+
+ TransactionException transactionException =
+ tx.getTransactionException(keyInfo, new PrimaryBucketException());
+
+ assertThat(transactionException).isInstanceOf(TransactionDataRebalancedException.class);
+ }
+
+ @Test
+ public void txStubGetTransactionExceptionReturnsSameTransactionExceptionIfNotCausedByPrimaryBucketException() {
+ TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+ DistributedMember target = mock(InternalDistributedMember.class);
+ tx.setLocalTXState(new PeerTXStateStub(tx, target, null));
+ TransactionException exception = mock(TransactionException.class);
+ KeyInfo keyInfo = mock(KeyInfo.class);
+
+ TransactionException transactionException = tx.getTransactionException(keyInfo, exception);
+
+ assertThat(transactionException).isSameAs(exception);
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStubTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStubTest.java
new file mode 100644
index 0000000..809b53c
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStubTest.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tx;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.KeyInfo;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.TXStateStub;
+
+public class PartitionedTXRegionStubTest {
+ private TXStateStub txStateStub;
+ private PartitionedRegion partitionedRegion;
+ private EntryEventImpl event;
+ private Object expectedObject;
+ private TransactionException expectedException;
+ private DistributedMember remoteTransactionHost;
+ private KeyInfo keyInfo;
+ private Object key;
+
+ @Before
+ public void setup() {
+ txStateStub = mock(TXStateStub.class);
+ partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+ event = mock(EntryEventImpl.class);
+ expectedObject = new Object();
+ expectedException = new TransactionException();
+ remoteTransactionHost = mock(InternalDistributedMember.class);
+ keyInfo = mock(KeyInfo.class);
+ key = new Object();
+ when(txStateStub.getTarget()).thenReturn(remoteTransactionHost);
+ when(event.getKeyInfo()).thenReturn(keyInfo);
+ when(keyInfo.getKey()).thenReturn(key);
+ when(keyInfo.getBucketId()).thenReturn(1);
+ }
+
+ @Test
+ public void destroyExistingEntryTracksBucketForTx() {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+
+ stub.destroyExistingEntry(event, true, expectedObject);
+
+ verify(stub).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void destroyExistingEntryThrowsTransactionExceptionFromRemoteHost() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ doThrow(expectedException).when(partitionedRegion).destroyRemotely(remoteTransactionHost, 1,
+ event, expectedObject);
+
+ Throwable caughtException = catchThrowable(() -> stub.destroyExistingEntry(event, true,
+ expectedObject));
+
+ assertThat(caughtException).isSameAs(expectedException);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void destroyExistingEntryThrowsTransactionDataRebalancedExceptionIfIsBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+ doNothing().when(stub).waitToRetry();
+ doThrow(forceReattemptException).when(partitionedRegion).destroyRemotely(remoteTransactionHost,
+ 1, event, expectedObject);
+
+ Throwable caughtException = catchThrowable(() -> stub.destroyExistingEntry(event, false,
+ expectedObject));
+
+ assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void destroyExistingEntryThrowsTransactionDataNodeHasDepartedExceptionIfIsNotBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+ doNothing().when(stub).waitToRetry();
+ doThrow(forceReattemptException).when(partitionedRegion).destroyRemotely(remoteTransactionHost,
+ 1, event, expectedObject);
+
+ Throwable caughtException = catchThrowable(() -> stub.destroyExistingEntry(event, true,
+ expectedObject));
+
+ assertThat(caughtException).isInstanceOf(TransactionDataNodeHasDepartedException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void getEntryReturnsEntryGotFromRemote() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ EntrySnapshot entry = mock(EntrySnapshot.class);
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion.getEntryRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+ key, false, true)).thenReturn((entry));
+
+ assertThat(stub.getEntry(keyInfo, true)).isEqualTo(entry);
+ verify(stub).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void getEntryThrowsTransactionExceptionFromRemoteHost() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ doThrow(expectedException).when(partitionedRegion)
+ .getEntryRemotely((InternalDistributedMember) remoteTransactionHost, 1, key, false, true);
+
+ Throwable caughtException = catchThrowable(() -> stub.getEntry(keyInfo, true));
+
+ assertThat(caughtException).isSameAs(expectedException);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void getEntryThrowsTransactionDataRebalancedExceptionIfIsBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+ doNothing().when(stub).waitToRetry();
+ doThrow(forceReattemptException).when(partitionedRegion)
+ .getEntryRemotely((InternalDistributedMember) remoteTransactionHost, 1, key, false, true);
+
+ Throwable caughtException = catchThrowable(() -> stub.getEntry(keyInfo, true));
+
+ assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void getEntryThrowsTransactionDataNodeHasDepartedExceptionIfIsNotBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+ doNothing().when(stub).waitToRetry();
+ doThrow(forceReattemptException).when(partitionedRegion)
+ .getEntryRemotely((InternalDistributedMember) remoteTransactionHost, 1, key, false, false);
+
+ Throwable caughtException = catchThrowable(() -> stub.getEntry(keyInfo, false));
+
+ assertThat(caughtException).isInstanceOf(TransactionDataNodeHasDepartedException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void invalidateExistingEntryTracksBucketForTx() {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+
+ stub.invalidateExistingEntry(event, true, false);
+
+ verify(stub).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void invalidateExistingEntryThrowsTransactionExceptionFromRemoteHost() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(keyInfo.getBucketId()).thenReturn(1);
+ doThrow(expectedException).when(partitionedRegion).invalidateRemotely(remoteTransactionHost, 1,
+ event);
+
+ Throwable caughtException =
+ catchThrowable(() -> stub.invalidateExistingEntry(event, false, false));
+
+ assertThat(caughtException).isSameAs(expectedException);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void invalidateExistingEntryThrowsTransactionDataRebalancedExceptionIfIsBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(keyInfo.getBucketId()).thenReturn(1);
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+ doNothing().when(stub).waitToRetry();
+ doThrow(forceReattemptException).when(partitionedRegion)
+ .invalidateRemotely(remoteTransactionHost, 1, event);
+
+ Throwable caughtException =
+ catchThrowable(() -> stub.invalidateExistingEntry(event, false, false));
+
+ assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void invalidateExistingEntryThrowsTransactionDataNodeHasDepartedExceptionIfIsNotBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(keyInfo.getBucketId()).thenReturn(1);
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+ doNothing().when(stub).waitToRetry();
+ doThrow(forceReattemptException).when(partitionedRegion)
+ .invalidateRemotely(remoteTransactionHost, 1, event);
+
+ Throwable caughtException =
+ catchThrowable(() -> stub.invalidateExistingEntry(event, false, false));
+
+ assertThat(caughtException).isInstanceOf(TransactionDataNodeHasDepartedException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void containsKeyReturnsTrueIfContainsKeyRemotelyReturnsTrue() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion.containsKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+ key)).thenReturn(true);
+
+ assertThat(stub.containsKey(keyInfo)).isTrue();
+ verify(stub).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void containsKeyReturnsFalseIfContainsKeyRemotelyReturnsFalse() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion.containsKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+ key)).thenReturn(false);
+
+ assertThat(stub.containsKey(keyInfo)).isFalse();
+ verify(stub).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void containsKeyThrowsTransactionExceptionFromRemoteHost() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion.containsKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+ key)).thenThrow(expectedException);
+
+ Throwable caughtException = catchThrowable(() -> stub.containsKey(keyInfo));
+
+ assertThat(caughtException).isSameAs(expectedException);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void containsKeyThrowsTransactionExceptionIfIsBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+ when(partitionedRegion.containsKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+ key)).thenThrow(forceReattemptException);
+ doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+ forceReattemptException);
+
+ Throwable caughtException = catchThrowable(() -> stub.containsKey(keyInfo));
+
+ assertThat(caughtException).isInstanceOf(TransactionException.class);
+ verify(stub).getTransactionException(keyInfo, forceReattemptException);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void containsKeyThrowsTransactionDataNodeHasDepartedExceptionIfIsNotBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doNothing().when(stub).waitToRetry();
+ doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+ when(partitionedRegion.containsKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+ key)).thenThrow(forceReattemptException);
+
+ Throwable caughtException = catchThrowable(() -> stub.containsKey(keyInfo));
+
+ assertThat(caughtException).isInstanceOf(TransactionDataNodeHasDepartedException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void containsValueForKeyReturnsTrueIfContainsKeyRemotelyReturnsTrue() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion
+ .containsValueForKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1, key))
+ .thenReturn(true);
+
+ assertThat(stub.containsValueForKey(keyInfo)).isTrue();
+ verify(stub).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void containsValueForKeyRemotelyReturnsFalseIfContainsKeyRemotelyReturnsFalse()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion
+ .containsValueForKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1, key))
+ .thenReturn(false);
+
+ assertThat(stub.containsValueForKey(keyInfo)).isFalse();
+ verify(stub).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void containsValueForKeyThrowsTransactionExceptionFromRemoteHost() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion
+ .containsValueForKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1, key))
+ .thenThrow(expectedException);
+
+ Throwable caughtException = catchThrowable(() -> stub.containsValueForKey(keyInfo));
+
+ assertThat(caughtException).isSameAs(expectedException);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void containsValueForKeyThrowsTransactionExceptionIfIsBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+ doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+ forceReattemptException);
+ when(partitionedRegion
+ .containsValueForKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1, key))
+ .thenThrow(forceReattemptException);
+
+ Throwable caughtException = catchThrowable(() -> stub.containsValueForKey(keyInfo));
+
+ assertThat(caughtException).isInstanceOf(TransactionException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void containsValueForKeyThrowsTransactionDataNodeHasDepartedExceptionIfIsNotBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doNothing().when(stub).waitToRetry();
+ doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+ when(partitionedRegion
+ .containsValueForKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1, key))
+ .thenThrow(forceReattemptException);
+
+ Throwable caughtException = catchThrowable(() -> stub.containsValueForKey(keyInfo));
+
+ assertThat(caughtException).isInstanceOf(TransactionDataNodeHasDepartedException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void findObjectReturnsObjectFoundFromRemote() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ EntrySnapshot entry = mock(EntrySnapshot.class);
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion.getRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+ key, null, true, null, event, false)).thenReturn((expectedObject));
+
+ assertThat(stub.findObject(keyInfo, false, false, null, true, null, event))
+ .isEqualTo(expectedObject);
+ verify(stub).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void findObjectThrowsTransactionExceptionFromRemoteHost() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub,
+ partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ doThrow(expectedException).when(partitionedRegion).getRemotely(
+ (InternalDistributedMember) remoteTransactionHost, 1, key, null, true, null, event, false);
+
+ Throwable caughtException =
+ catchThrowable(() -> stub.findObject(keyInfo, false, false, null, true, null, event));
+
+ assertThat(caughtException).isSameAs(expectedException);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void findObjectThrowsTransactionExceptionIfIsBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub,
+ partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+ doNothing().when(stub).waitToRetry();
+ doThrow(forceReattemptException).when(partitionedRegion)
+ .getRemotely((InternalDistributedMember) remoteTransactionHost, 1, key, null, true, null,
+ event, false);
+ doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+ forceReattemptException);
+
+ Throwable caughtException =
+ catchThrowable(() -> stub.findObject(keyInfo, false, false, null, true, null, event));
+
+ assertThat(caughtException).isInstanceOf(TransactionException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void findObjectThrowsTransactionExceptionIfIsNotBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub,
+ partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+ doNothing().when(stub).waitToRetry();
+ doThrow(forceReattemptException).when(partitionedRegion)
+ .getRemotely((InternalDistributedMember) remoteTransactionHost, 1, key, null, true, null,
+ event, false);
+ doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+ forceReattemptException);
+
+ Throwable caughtException =
+ catchThrowable(() -> stub.findObject(keyInfo, false, false, null, true, null, event));
+
+ assertThat(caughtException).isInstanceOf(TransactionException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void putEntryReturnsTrueIfPutRemotelyReturnsTrue() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion
+ .putRemotely((InternalDistributedMember) remoteTransactionHost, event, false, false,
+ expectedObject, true))
+ .thenReturn(true);
+
+ assertThat(stub.putEntry(event, false, false, expectedObject, true, 1, false)).isTrue();
+ verify(stub).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void putEntryReturnsFalseIfPutRemotelyReturnsFalse()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion
+ .putRemotely((InternalDistributedMember) remoteTransactionHost, event, false, false,
+ expectedObject, true))
+ .thenReturn(false);
+
+ assertThat(stub.putEntry(event, false, false, expectedObject, true, 1, false)).isFalse();
+ verify(stub).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void putEntryThrowsTransactionExceptionFromRemoteHost() throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ when(partitionedRegion
+ .putRemotely((InternalDistributedMember) remoteTransactionHost, event, true, false,
+ expectedObject, true))
+ .thenThrow(expectedException);
+
+ Throwable caughtException =
+ catchThrowable(() -> stub.putEntry(event, true, false, expectedObject, true, 1, false));
+
+ assertThat(caughtException).isSameAs(expectedException);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void putEntryThrowsTransactionExceptionIfIsBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+ doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+ forceReattemptException);
+ when(partitionedRegion
+ .putRemotely((InternalDistributedMember) remoteTransactionHost, event, true, false,
+ expectedObject, true))
+ .thenThrow(forceReattemptException);
+
+ Throwable caughtException =
+ catchThrowable(() -> stub.putEntry(event, true, false, expectedObject, true, 1, false));
+
+ assertThat(caughtException).isInstanceOf(TransactionException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+ @Test
+ public void putEntryThrowsTransactionExceptionIfIsNotBucketNotFoundException()
+ throws Exception {
+ PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+ when(event.getRegion()).thenReturn(partitionedRegion);
+
+ ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+ doNothing().when(stub).waitToRetry();
+ doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+ doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+ forceReattemptException);
+ when(partitionedRegion
+ .putRemotely((InternalDistributedMember) remoteTransactionHost, event, true, false,
+ expectedObject, true))
+ .thenThrow(forceReattemptException);
+
+ Throwable caughtException =
+ catchThrowable(() -> stub.putEntry(event, true, false, expectedObject, true, 1, false));
+
+ assertThat(caughtException).isInstanceOf(TransactionException.class);
+ verify(stub, never()).trackBucketForTx(keyInfo);
+ }
+
+}