You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/01/22 21:17:41 UTC
[geode] 01/02: GEODE-4142: save commit message for failover after
jta commit.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-4142
in repository https://gitbox.apache.org/repos/asf/geode.git
commit ae5fc65c019090cebcc38c939931c1bfd7225730
Author: eshu <es...@pivotal.io>
AuthorDate: Wed Jan 10 12:55:13 2018 -0800
GEODE-4142: save commit message for failover after jta commit.
check if the jta has completed before retrying the beforeCompeltion and afterCompletion in failover cases.
---
.../geode/internal/cache/DistTXCommitMessage.java | 14 ++--
.../internal/cache/JtaAfterCompletionMessage.java | 11 +++-
.../internal/cache/JtaBeforeCompletionMessage.java | 6 ++
.../geode/internal/cache/TXCommitMessage.java | 4 ++
.../apache/geode/internal/cache/TXManagerImpl.java | 10 ++-
.../internal/cache/TXRemoteCommitMessage.java | 14 ++--
.../org/apache/geode/internal/cache/TXState.java | 7 +-
.../cache/tier/sockets/command/CommitCommand.java | 5 +-
.../sockets/command/TXSynchronizationCommand.java | 27 +++++++-
.../cache/JtaAfterCompletionMessageTest.java | 52 +++++++++++++++
.../cache/JtaBeforeCompletionMessageTest.java | 51 ++++++++++++++
.../internal/jta/ClientServerJTADUnitTest.java | 77 +++++++++++++++++++---
12 files changed, 245 insertions(+), 33 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
index 5dc9e7e..e29d10a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
@@ -76,17 +76,17 @@ public class DistTXCommitMessage extends TXMessage {
InternalCache cache = dm.getCache();
TXManagerImpl txMgr = cache.getTXMgr();
final TXStateProxy txStateProxy = txMgr.getTXState();
- TXCommitMessage cmsg = null;
+ TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
try {
// do the actual commit, only if it was not done before
- if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+ if (commitMessage != null) {
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXCommitMessage.operateOnTx: found a previously committed transaction:{}", txId);
}
- cmsg = txMgr.getRecentlyCompletedMessage(txId);
- if (txMgr.isExceptionToken(cmsg)) {
- throw txMgr.getExceptionForToken(cmsg, txId);
+ commitMessage = txMgr.getRecentlyCompletedMessage(txId);
+ if (txMgr.isExceptionToken(commitMessage)) {
+ throw txMgr.getExceptionForToken(commitMessage, txId);
}
} else {
// [DISTTX] TODO - Handle scenarios of no txState
@@ -125,13 +125,13 @@ public class DistTXCommitMessage extends TXMessage {
txMgr.commit();
- cmsg = txStateProxy.getCommitMessage();
+ commitMessage = txStateProxy.getCommitMessage();
}
}
} finally {
txMgr.removeHostedTXState(txId);
}
- DistTXCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+ DistTXCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage, getReplySender(dm));
/*
* return false so there isn't another reply
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
index 645b087..d843ba8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
@@ -78,11 +78,18 @@ public class JtaAfterCompletionMessage extends TXMessage {
if (logger.isDebugEnabled()) {
logger.debug("JTA: Calling afterCompletion for :{}", txId);
}
+ TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
+ if (commitMessage != null) {
+ TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
+ getReplySender(dm));
+ return false;
+ }
TXStateProxy txState = txMgr.getTXState();
txState.setCommitOnBehalfOfRemoteStub(true);
txState.afterCompletion(status);
- TXCommitMessage cmsg = txState.getCommitMessage();
- TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+ commitMessage = txState.getCommitMessage();
+ TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
+ getReplySender(dm));
txMgr.removeHostedTXState(txId);
return false;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
index a3f44ef..0727cb0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
@@ -61,6 +61,12 @@ public class JtaBeforeCompletionMessage extends TXMessage {
if (logger.isDebugEnabled()) {
logger.debug("JTA: Calling beforeCompletion for :{}", txId);
}
+ // Check if jta has been completed, possible due to tx failover.
+ // No need to execute beforeCompletion if already completed.
+ if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+ return true;
+ }
+
txMgr.getTXState().beforeCompletion();
return true;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index fb5b6c5..feed893 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -145,6 +145,10 @@ public class TXCommitMessage extends PooledDistributionMessage
* transaction
*/
public static final TXCommitMessage EXCEPTION_MSG = new TXCommitMessage();
+ /**
+ * A token to be put in TXManagerImpl#failoverMap to represent a rolled back transaction
+ */
+ public static final TXCommitMessage ROLLBACK_MSG = new TXCommitMessage();
public TXCommitMessage(TXId txIdent, DM dm, TXState txState) {
this.dm = dm;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index 560629b..85a6e92 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -1146,9 +1146,11 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
}
}
- private void saveTXStateForClientFailover(TXStateProxy tx) {
+ void saveTXStateForClientFailover(TXStateProxy tx) {
if (tx.isOnBehalfOfClient() && tx.isRealDealLocal()) {
- failoverMap.put(tx.getTxId(), tx.getCommitMessage());
+ TXCommitMessage commitMessage =
+ tx.getCommitMessage() == null ? TXCommitMessage.ROLLBACK_MSG : tx.getCommitMessage();
+ failoverMap.put(tx.getTxId(), commitMessage);
if (logger.isDebugEnabled()) {
logger.debug(
"TX: storing client initiated transaction:{}; now there are {} entries in the failoverMap",
@@ -1224,7 +1226,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
* @see #isExceptionToken(TXCommitMessage)
*/
public TXCommitMessage getRecentlyCompletedMessage(TXId txId) {
- return failoverMap.get(txId);
+ synchronized (failoverMap) {
+ return failoverMap.get(txId);
+ }
}
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
index 8036f92..d9d5cb8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
@@ -80,16 +80,15 @@ public class TXRemoteCommitMessage extends TXMessage {
logger.debug("TX: Committing: {}", txId);
}
final TXStateProxy txState = txMgr.getTXState();
- TXCommitMessage cmsg = null;
+ TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
try {
// do the actual commit, only if it was not done before
- if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+ if (commitMessage != null) {
if (logger.isDebugEnabled()) {
logger.debug("TX: found a previously committed transaction:{}", txId);
}
- cmsg = txMgr.getRecentlyCompletedMessage(txId);
- if (txMgr.isExceptionToken(cmsg)) {
- throw txMgr.getExceptionForToken(cmsg, txId);
+ if (txMgr.isExceptionToken(commitMessage)) {
+ throw txMgr.getExceptionForToken(commitMessage, txId);
}
} else {
// if no TXState was created (e.g. due to only getEntry/size operations
@@ -97,13 +96,14 @@ public class TXRemoteCommitMessage extends TXMessage {
if (txState != null) {
txState.setCommitOnBehalfOfRemoteStub(true);
txMgr.commit();
- cmsg = txState.getCommitMessage();
+ commitMessage = txState.getCommitMessage();
}
}
} finally {
txMgr.removeHostedTXState(txId);
}
- TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+ TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
+ getReplySender(dm));
/*
* return false so there isn't another reply
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index b7f4c3e..6462e3b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -1054,8 +1054,9 @@ public class TXState implements TXStateInterface {
Assert.assertTrue(this.locks != null,
"Gemfire Transaction afterCompletion called with illegal state.");
try {
- this.proxy.getTxMgr().setTXState(null);
+ proxy.getTxMgr().setTXState(null);
commit();
+ saveTXCommitMessageForClientFailover();
} catch (CommitConflictException error) {
Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
+ " afterCompletion failed.due to CommitConflictException: " + error);
@@ -1069,6 +1070,7 @@ public class TXState implements TXStateInterface {
this.jtaLifeTime = opStart - getBeginTime();
this.proxy.getTxMgr().setTXState(null);
rollback();
+ saveTXCommitMessageForClientFailover();
this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
break;
default:
@@ -1077,6 +1079,9 @@ public class TXState implements TXStateInterface {
// System.err.println("end afterCompletion");
}
+ private void saveTXCommitMessageForClientFailover() {
+ proxy.getTxMgr().saveTXStateForClientFailover(proxy);
+ }
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
index bcdc632..2ab92e4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
@@ -56,9 +56,8 @@ public class CommitCommand extends BaseCommand {
(InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
int uniqId = clientMessage.getTransactionId();
TXId txId = new TXId(client, uniqId);
- TXCommitMessage commitMsg = null;
- if (txMgr.isHostedTxRecentlyCompleted(txId)) {
- commitMsg = txMgr.getRecentlyCompletedMessage(txId);
+ TXCommitMessage commitMsg = txMgr.getRecentlyCompletedMessage(txId);
+ if (commitMsg != null) {
if (logger.isDebugEnabled()) {
logger.debug("TX: returning a recently committed txMessage for tx: {}", txId);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index b7b90d2..c1fb616 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -25,6 +25,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.cache.TXSynchronizationRunnable;
@@ -72,7 +73,7 @@ public class TXSynchronizationCommand extends BaseCommand {
public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
final SecurityService securityService, long start)
throws IOException, ClassNotFoundException, InterruptedException {
-
+ final boolean isDebugEnabled = logger.isDebugEnabled();
serverConnection.setAsTrue(REQUIRES_RESPONSE);
CompletionType type = CompletionType.values()[clientMessage.getPart(0).getInt()];
@@ -93,11 +94,33 @@ public class TXSynchronizationCommand extends BaseCommand {
// get the tx state without associating it with this thread. That's done later
final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member, true);
+ final TXId txId = txProxy.getTxId();
+ TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
+ if (commitMessage != null) {
+ assert type == CompletionType.AFTER_COMPLETION;
+ try {
+ CommitCommand.writeCommitResponse(commitMessage, clientMessage, serverConnection);
+ } catch (IOException e) {
+ if (isDebugEnabled) {
+ logger.debug("Problem writing reply to client", e);
+ }
+ } catch (RuntimeException e) {
+ try {
+ writeException(clientMessage, e, false, serverConnection);
+ } catch (IOException ioe) {
+ if (isDebugEnabled) {
+ logger.debug("Problem writing reply to client", ioe);
+ }
+ }
+ }
+ serverConnection.setAsTrue(RESPONDED);
+ return;
+ }
+
// we have to run beforeCompletion and afterCompletion in the same thread
// because beforeCompletion obtains locks for the thread and afterCompletion
// releases them
if (txProxy != null) {
- final boolean isDebugEnabled = logger.isDebugEnabled();
try {
if (type == CompletionType.BEFORE_COMPLETION) {
Runnable beforeCompletion = new Runnable() {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java
new file mode 100644
index 0000000..1efa0a4
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JtaAfterCompletionMessageTest {
+ @Test
+ public void testAfterCompletionNotInvokedIfJTACompleted() throws Exception {
+ InternalCache cache = mock(InternalCache.class);
+ TXManagerImpl txMgr = mock(TXManagerImpl.class);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ TXId txId = mock(TXId.class);
+
+ when(distributionManager.getCache()).thenReturn(cache);
+ when(cache.getTXMgr()).thenReturn(txMgr);
+ when(txMgr.getRecentlyCompletedMessage(txId)).thenReturn(mock(TXCommitMessage.class));
+ when(txMgr.getTXState()).thenReturn(mock(TXStateProxyImpl.class));
+
+ JtaAfterCompletionMessage message = new JtaAfterCompletionMessage();
+ JtaAfterCompletionMessage spyMessage = spy(message);
+ when(spyMessage.getSender()).thenReturn(mock(InternalDistributedMember.class));
+
+ spyMessage.operateOnTx(txId, distributionManager);
+ verify(txMgr, never()).getTXState();
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java
new file mode 100644
index 0000000..7b7e6ab
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JtaBeforeCompletionMessageTest {
+ @Test
+ public void testBeforeCompletionNotInvokedIfJTACompleted() throws Exception {
+ InternalCache cache = mock(InternalCache.class);
+ TXManagerImpl txMgr = mock(TXManagerImpl.class);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ TXId txId = mock(TXId.class);
+
+ when(distributionManager.getCache()).thenReturn(cache);
+ when(cache.getTXMgr()).thenReturn(txMgr);
+ when(txMgr.isHostedTxRecentlyCompleted(txId)).thenReturn(true);
+ when(txMgr.getTXState()).thenReturn(mock(TXStateProxyImpl.class));
+
+ JtaBeforeCompletionMessage message = new JtaBeforeCompletionMessage(1,
+ mock(InternalDistributedMember.class), mock(ReplyProcessor21.class));
+
+ message.operateOnTx(txId, distributionManager);
+ verify(txMgr, never()).getTXState();
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
index 835fb7e..d739010 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.transaction.Status;
+import javax.transaction.TransactionManager;
import org.awaitility.Awaitility;
import org.junit.Test;
@@ -36,7 +37,10 @@ import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.cache.TXStateProxyImpl;
import org.apache.geode.internal.cache.tx.ClientTXStateStub;
import org.apache.geode.internal.logging.LogService;
@@ -61,14 +65,7 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
getBlackboard().initBlackboard();
final Properties properties = getDistributedSystemProperties();
- final int port = server.invoke("create cache", () -> {
- Cache cache = getCache(properties);
- CacheServer cacheServer = createCacheServer(cache, 0);
- Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
- region.put(key, value);
-
- return cacheServer.getPort();
- });
+ final int port = server.invoke(() -> createServerRegion(regionName, properties));
client.invoke(() -> createClientRegion(host, port, regionName));
@@ -193,4 +190,68 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
}
assertTrue(region.get(key).equals(newValue));
}
+
+ @Test
+ public void testClientCompletedJTAIsInFailoverMap() throws Exception {
+ final String regionName = getUniqueName();
+ final Properties properties = getDistributedSystemProperties();
+
+ final int port = server.invoke(() -> createServerRegion(regionName, properties));
+
+ createClientRegion(host, port, regionName);
+
+ Region region = getCache().getRegion(regionName);
+ assertTrue(region.get(key).equals(value));
+
+ TransactionManager JTAManager =
+ (TransactionManager) getCache().getJNDIContext().lookup("java:/TransactionManager");
+ assertNotNull(JTAManager);
+
+ // commit
+ JTAManager.begin();
+ region.put(key, newValue);
+ final TXId committedTXId = getTxId();
+ JTAManager.commit();
+ assertTrue(region.get(key).equals(newValue));
+
+ server.invoke(() -> verifyJTAIsCompleted(properties, committedTXId));
+
+ // rollback
+ JTAManager.begin();
+ region.put(key, "UncommittedValue");
+ final TXId rolledBackTXId = getTxId();
+ JTAManager.rollback();
+ assertTrue(region.get(key).equals(newValue));
+
+ server.invoke(() -> verifyJTAIsRollback(properties, rolledBackTXId));
+ }
+
+ private Integer createServerRegion(String regionName, Properties properties) {
+ Cache cache = getCache(properties);
+ CacheServer cacheServer = createCacheServer(cache, 0);
+ Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+ region.put(key, value);
+
+ return cacheServer.getPort();
+ }
+
+ private TXId getTxId() {
+ TXManagerImpl txManager = (TXManagerImpl) getCache().getCacheTransactionManager();
+ TXStateProxy txStateProxy = txManager.getTXState();
+ return txStateProxy.getTxId();
+ }
+
+ private void verifyJTAIsCompleted(Properties properties, TXId committedTXId) {
+ Cache cache = getCache(properties);
+ assertTrue(((TXManagerImpl) cache.getCacheTransactionManager())
+ .isHostedTxRecentlyCompleted(committedTXId));
+ }
+
+ private void verifyJTAIsRollback(Properties properties, TXId rollbackTXId) {
+ Cache cache = getCache(properties);
+ assertEquals(TXCommitMessage.ROLLBACK_MSG, ((TXManagerImpl) cache.getCacheTransactionManager())
+ .getRecentlyCompletedMessage(rollbackTXId));
+
+ }
+
}
--
To stop receiving notification emails like this one, please contact
eshu11@apache.org.