You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/01/22 21:17:41 UTC

[geode] 01/02: GEODE-4142: save commit message for failover after jta commit.

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-4142
in repository https://gitbox.apache.org/repos/asf/geode.git

commit ae5fc65c019090cebcc38c939931c1bfd7225730
Author: eshu <es...@pivotal.io>
AuthorDate: Wed Jan 10 12:55:13 2018 -0800

    GEODE-4142: save commit message for failover after jta commit.
    
      check if the jta has completed before retrying the beforeCompeltion and afterCompletion in failover cases.
---
 .../geode/internal/cache/DistTXCommitMessage.java  | 14 ++--
 .../internal/cache/JtaAfterCompletionMessage.java  | 11 +++-
 .../internal/cache/JtaBeforeCompletionMessage.java |  6 ++
 .../geode/internal/cache/TXCommitMessage.java      |  4 ++
 .../apache/geode/internal/cache/TXManagerImpl.java | 10 ++-
 .../internal/cache/TXRemoteCommitMessage.java      | 14 ++--
 .../org/apache/geode/internal/cache/TXState.java   |  7 +-
 .../cache/tier/sockets/command/CommitCommand.java  |  5 +-
 .../sockets/command/TXSynchronizationCommand.java  | 27 +++++++-
 .../cache/JtaAfterCompletionMessageTest.java       | 52 +++++++++++++++
 .../cache/JtaBeforeCompletionMessageTest.java      | 51 ++++++++++++++
 .../internal/jta/ClientServerJTADUnitTest.java     | 77 +++++++++++++++++++---
 12 files changed, 245 insertions(+), 33 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
index 5dc9e7e..e29d10a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
@@ -76,17 +76,17 @@ public class DistTXCommitMessage extends TXMessage {
     InternalCache cache = dm.getCache();
     TXManagerImpl txMgr = cache.getTXMgr();
     final TXStateProxy txStateProxy = txMgr.getTXState();
-    TXCommitMessage cmsg = null;
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
     try {
       // do the actual commit, only if it was not done before
-      if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+      if (commitMessage != null) {
         if (logger.isDebugEnabled()) {
           logger.debug(
               "DistTXCommitMessage.operateOnTx: found a previously committed transaction:{}", txId);
         }
-        cmsg = txMgr.getRecentlyCompletedMessage(txId);
-        if (txMgr.isExceptionToken(cmsg)) {
-          throw txMgr.getExceptionForToken(cmsg, txId);
+        commitMessage = txMgr.getRecentlyCompletedMessage(txId);
+        if (txMgr.isExceptionToken(commitMessage)) {
+          throw txMgr.getExceptionForToken(commitMessage, txId);
         }
       } else {
         // [DISTTX] TODO - Handle scenarios of no txState
@@ -125,13 +125,13 @@ public class DistTXCommitMessage extends TXMessage {
 
           txMgr.commit();
 
-          cmsg = txStateProxy.getCommitMessage();
+          commitMessage = txStateProxy.getCommitMessage();
         }
       }
     } finally {
       txMgr.removeHostedTXState(txId);
     }
-    DistTXCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+    DistTXCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage, getReplySender(dm));
 
     /*
      * return false so there isn't another reply
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
index 645b087..d843ba8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
@@ -78,11 +78,18 @@ public class JtaAfterCompletionMessage extends TXMessage {
     if (logger.isDebugEnabled()) {
       logger.debug("JTA: Calling afterCompletion for :{}", txId);
     }
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
+    if (commitMessage != null) {
+      TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
+          getReplySender(dm));
+      return false;
+    }
     TXStateProxy txState = txMgr.getTXState();
     txState.setCommitOnBehalfOfRemoteStub(true);
     txState.afterCompletion(status);
-    TXCommitMessage cmsg = txState.getCommitMessage();
-    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+    commitMessage = txState.getCommitMessage();
+    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
+        getReplySender(dm));
     txMgr.removeHostedTXState(txId);
     return false;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
index a3f44ef..0727cb0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
@@ -61,6 +61,12 @@ public class JtaBeforeCompletionMessage extends TXMessage {
     if (logger.isDebugEnabled()) {
       logger.debug("JTA: Calling beforeCompletion for :{}", txId);
     }
+    // Check if jta has been completed, possible due to tx failover.
+    // No need to execute beforeCompletion if already completed.
+    if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+      return true;
+    }
+
     txMgr.getTXState().beforeCompletion();
     return true;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index fb5b6c5..feed893 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -145,6 +145,10 @@ public class TXCommitMessage extends PooledDistributionMessage
    * transaction
    */
   public static final TXCommitMessage EXCEPTION_MSG = new TXCommitMessage();
+  /**
+   * A token to be put in TXManagerImpl#failoverMap to represent a rolled back transaction
+   */
+  public static final TXCommitMessage ROLLBACK_MSG = new TXCommitMessage();
 
   public TXCommitMessage(TXId txIdent, DM dm, TXState txState) {
     this.dm = dm;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index 560629b..85a6e92 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -1146,9 +1146,11 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     }
   }
 
-  private void saveTXStateForClientFailover(TXStateProxy tx) {
+  void saveTXStateForClientFailover(TXStateProxy tx) {
     if (tx.isOnBehalfOfClient() && tx.isRealDealLocal()) {
-      failoverMap.put(tx.getTxId(), tx.getCommitMessage());
+      TXCommitMessage commitMessage =
+          tx.getCommitMessage() == null ? TXCommitMessage.ROLLBACK_MSG : tx.getCommitMessage();
+      failoverMap.put(tx.getTxId(), commitMessage);
       if (logger.isDebugEnabled()) {
         logger.debug(
             "TX: storing client initiated transaction:{}; now there are {} entries in the failoverMap",
@@ -1224,7 +1226,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
    * @see #isExceptionToken(TXCommitMessage)
    */
   public TXCommitMessage getRecentlyCompletedMessage(TXId txId) {
-    return failoverMap.get(txId);
+    synchronized (failoverMap) {
+      return failoverMap.get(txId);
+    }
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
index 8036f92..d9d5cb8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
@@ -80,16 +80,15 @@ public class TXRemoteCommitMessage extends TXMessage {
       logger.debug("TX: Committing: {}", txId);
     }
     final TXStateProxy txState = txMgr.getTXState();
-    TXCommitMessage cmsg = null;
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
     try {
       // do the actual commit, only if it was not done before
-      if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+      if (commitMessage != null) {
         if (logger.isDebugEnabled()) {
           logger.debug("TX: found a previously committed transaction:{}", txId);
         }
-        cmsg = txMgr.getRecentlyCompletedMessage(txId);
-        if (txMgr.isExceptionToken(cmsg)) {
-          throw txMgr.getExceptionForToken(cmsg, txId);
+        if (txMgr.isExceptionToken(commitMessage)) {
+          throw txMgr.getExceptionForToken(commitMessage, txId);
         }
       } else {
         // if no TXState was created (e.g. due to only getEntry/size operations
@@ -97,13 +96,14 @@ public class TXRemoteCommitMessage extends TXMessage {
         if (txState != null) {
           txState.setCommitOnBehalfOfRemoteStub(true);
           txMgr.commit();
-          cmsg = txState.getCommitMessage();
+          commitMessage = txState.getCommitMessage();
         }
       }
     } finally {
       txMgr.removeHostedTXState(txId);
     }
-    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
+        getReplySender(dm));
 
     /*
      * return false so there isn't another reply
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index b7f4c3e..6462e3b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -1054,8 +1054,9 @@ public class TXState implements TXStateInterface {
         Assert.assertTrue(this.locks != null,
             "Gemfire Transaction afterCompletion called with illegal state.");
         try {
-          this.proxy.getTxMgr().setTXState(null);
+          proxy.getTxMgr().setTXState(null);
           commit();
+          saveTXCommitMessageForClientFailover();
         } catch (CommitConflictException error) {
           Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
               + " afterCompletion failed.due to CommitConflictException: " + error);
@@ -1069,6 +1070,7 @@ public class TXState implements TXStateInterface {
         this.jtaLifeTime = opStart - getBeginTime();
         this.proxy.getTxMgr().setTXState(null);
         rollback();
+        saveTXCommitMessageForClientFailover();
         this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
         break;
       default:
@@ -1077,6 +1079,9 @@ public class TXState implements TXStateInterface {
     // System.err.println("end afterCompletion");
   }
 
+  private void saveTXCommitMessageForClientFailover() {
+    proxy.getTxMgr().saveTXStateForClientFailover(proxy);
+  }
 
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
index bcdc632..2ab92e4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
@@ -56,9 +56,8 @@ public class CommitCommand extends BaseCommand {
         (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
     int uniqId = clientMessage.getTransactionId();
     TXId txId = new TXId(client, uniqId);
-    TXCommitMessage commitMsg = null;
-    if (txMgr.isHostedTxRecentlyCompleted(txId)) {
-      commitMsg = txMgr.getRecentlyCompletedMessage(txId);
+    TXCommitMessage commitMsg = txMgr.getRecentlyCompletedMessage(txId);
+    if (commitMsg != null) {
       if (logger.isDebugEnabled()) {
         logger.debug("TX: returning a recently committed txMessage for tx: {}", txId);
       }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index b7b90d2..c1fb616 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -25,6 +25,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
 import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.TXStateProxy;
 import org.apache.geode.internal.cache.TXSynchronizationRunnable;
@@ -72,7 +73,7 @@ public class TXSynchronizationCommand extends BaseCommand {
   public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
       final SecurityService securityService, long start)
       throws IOException, ClassNotFoundException, InterruptedException {
-
+    final boolean isDebugEnabled = logger.isDebugEnabled();
     serverConnection.setAsTrue(REQUIRES_RESPONSE);
 
     CompletionType type = CompletionType.values()[clientMessage.getPart(0).getInt()];
@@ -93,11 +94,33 @@ public class TXSynchronizationCommand extends BaseCommand {
     // get the tx state without associating it with this thread. That's done later
     final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member, true);
 
+    final TXId txId = txProxy.getTxId();
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
+    if (commitMessage != null) {
+      assert type == CompletionType.AFTER_COMPLETION;
+      try {
+        CommitCommand.writeCommitResponse(commitMessage, clientMessage, serverConnection);
+      } catch (IOException e) {
+        if (isDebugEnabled) {
+          logger.debug("Problem writing reply to client", e);
+        }
+      } catch (RuntimeException e) {
+        try {
+          writeException(clientMessage, e, false, serverConnection);
+        } catch (IOException ioe) {
+          if (isDebugEnabled) {
+            logger.debug("Problem writing reply to client", ioe);
+          }
+        }
+      }
+      serverConnection.setAsTrue(RESPONDED);
+      return;
+    }
+
     // we have to run beforeCompletion and afterCompletion in the same thread
     // because beforeCompletion obtains locks for the thread and afterCompletion
     // releases them
     if (txProxy != null) {
-      final boolean isDebugEnabled = logger.isDebugEnabled();
       try {
         if (type == CompletionType.BEFORE_COMPLETION) {
           Runnable beforeCompletion = new Runnable() {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java
new file mode 100644
index 0000000..1efa0a4
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JtaAfterCompletionMessageTest {
+  @Test
+  public void testAfterCompletionNotInvokedIfJTACompleted() throws Exception {
+    InternalCache cache = mock(InternalCache.class);
+    TXManagerImpl txMgr = mock(TXManagerImpl.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    TXId txId = mock(TXId.class);
+
+    when(distributionManager.getCache()).thenReturn(cache);
+    when(cache.getTXMgr()).thenReturn(txMgr);
+    when(txMgr.getRecentlyCompletedMessage(txId)).thenReturn(mock(TXCommitMessage.class));
+    when(txMgr.getTXState()).thenReturn(mock(TXStateProxyImpl.class));
+
+    JtaAfterCompletionMessage message = new JtaAfterCompletionMessage();
+    JtaAfterCompletionMessage spyMessage = spy(message);
+    when(spyMessage.getSender()).thenReturn(mock(InternalDistributedMember.class));
+
+    spyMessage.operateOnTx(txId, distributionManager);
+    verify(txMgr, never()).getTXState();
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java
new file mode 100644
index 0000000..7b7e6ab
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JtaBeforeCompletionMessageTest {
+  @Test
+  public void testBeforeCompletionNotInvokedIfJTACompleted() throws Exception {
+    InternalCache cache = mock(InternalCache.class);
+    TXManagerImpl txMgr = mock(TXManagerImpl.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    TXId txId = mock(TXId.class);
+
+    when(distributionManager.getCache()).thenReturn(cache);
+    when(cache.getTXMgr()).thenReturn(txMgr);
+    when(txMgr.isHostedTxRecentlyCompleted(txId)).thenReturn(true);
+    when(txMgr.getTXState()).thenReturn(mock(TXStateProxyImpl.class));
+
+    JtaBeforeCompletionMessage message = new JtaBeforeCompletionMessage(1,
+        mock(InternalDistributedMember.class), mock(ReplyProcessor21.class));
+
+    message.operateOnTx(txId, distributionManager);
+    verify(txMgr, never()).getTXState();
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
index 835fb7e..d739010 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import javax.transaction.Status;
+import javax.transaction.TransactionManager;
 
 import org.awaitility.Awaitility;
 import org.junit.Test;
@@ -36,7 +37,10 @@ import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
 import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
 import org.apache.geode.internal.cache.TXStateProxyImpl;
 import org.apache.geode.internal.cache.tx.ClientTXStateStub;
 import org.apache.geode.internal.logging.LogService;
@@ -61,14 +65,7 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
     getBlackboard().initBlackboard();
     final Properties properties = getDistributedSystemProperties();
 
-    final int port = server.invoke("create cache", () -> {
-      Cache cache = getCache(properties);
-      CacheServer cacheServer = createCacheServer(cache, 0);
-      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
-      region.put(key, value);
-
-      return cacheServer.getPort();
-    });
+    final int port = server.invoke(() -> createServerRegion(regionName, properties));
 
     client.invoke(() -> createClientRegion(host, port, regionName));
 
@@ -193,4 +190,68 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
     }
     assertTrue(region.get(key).equals(newValue));
   }
+
+  @Test
+  public void testClientCompletedJTAIsInFailoverMap() throws Exception {
+    final String regionName = getUniqueName();
+    final Properties properties = getDistributedSystemProperties();
+
+    final int port = server.invoke(() -> createServerRegion(regionName, properties));
+
+    createClientRegion(host, port, regionName);
+
+    Region region = getCache().getRegion(regionName);
+    assertTrue(region.get(key).equals(value));
+
+    TransactionManager JTAManager =
+        (TransactionManager) getCache().getJNDIContext().lookup("java:/TransactionManager");
+    assertNotNull(JTAManager);
+
+    // commit
+    JTAManager.begin();
+    region.put(key, newValue);
+    final TXId committedTXId = getTxId();
+    JTAManager.commit();
+    assertTrue(region.get(key).equals(newValue));
+
+    server.invoke(() -> verifyJTAIsCompleted(properties, committedTXId));
+
+    // rollback
+    JTAManager.begin();
+    region.put(key, "UncommittedValue");
+    final TXId rolledBackTXId = getTxId();
+    JTAManager.rollback();
+    assertTrue(region.get(key).equals(newValue));
+
+    server.invoke(() -> verifyJTAIsRollback(properties, rolledBackTXId));
+  }
+
+  private Integer createServerRegion(String regionName, Properties properties) {
+    Cache cache = getCache(properties);
+    CacheServer cacheServer = createCacheServer(cache, 0);
+    Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+    region.put(key, value);
+
+    return cacheServer.getPort();
+  }
+
+  private TXId getTxId() {
+    TXManagerImpl txManager = (TXManagerImpl) getCache().getCacheTransactionManager();
+    TXStateProxy txStateProxy = txManager.getTXState();
+    return txStateProxy.getTxId();
+  }
+
+  private void verifyJTAIsCompleted(Properties properties, TXId committedTXId) {
+    Cache cache = getCache(properties);
+    assertTrue(((TXManagerImpl) cache.getCacheTransactionManager())
+        .isHostedTxRecentlyCompleted(committedTXId));
+  }
+
+  private void verifyJTAIsRollback(Properties properties, TXId rollbackTXId) {
+    Cache cache = getCache(properties);
+    assertEquals(TXCommitMessage.ROLLBACK_MSG, ((TXManagerImpl) cache.getCacheTransactionManager())
+        .getRecentlyCompletedMessage(rollbackTXId));
+
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
eshu11@apache.org.