You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/01/22 21:17:40 UTC

[geode] branch feature/GEODE-4142 updated (eec297f -> 5828c00)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a change to branch feature/GEODE-4142
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from eec297f  GEODE-4112: Replaced FunctionAdapter with Function
     new ae5fc65  GEODE-4142: save commit message for failover after jta commit.
     new 5828c00  fix a race: FindRemoteTXMessageReply should not modify tx commit message state as it does not hold the tx lock; instead, clinet version is modified in TXRemoteCommitMessage when the tx lock is held.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../geode/internal/cache/DistTXCommitMessage.java  | 14 ++--
 .../geode/internal/cache/FindRemoteTXMessage.java  | 12 ++--
 .../internal/cache/JtaAfterCompletionMessage.java  | 12 +++-
 .../internal/cache/JtaBeforeCompletionMessage.java |  6 ++
 .../geode/internal/cache/TXCommitMessage.java      |  4 ++
 .../apache/geode/internal/cache/TXManagerImpl.java | 10 ++-
 .../internal/cache/TXRemoteCommitMessage.java      | 17 +++--
 .../org/apache/geode/internal/cache/TXState.java   |  7 +-
 .../cache/tier/sockets/command/CommitCommand.java  |  5 +-
 .../sockets/command/TXSynchronizationCommand.java  | 27 +++++++-
 ...est.java => JtaAfterCompletionMessageTest.java} | 43 ++++++------
 ...st.java => JtaBeforeCompletionMessageTest.java} | 42 ++++++------
 .../internal/cache/TXRemoteCommitMessageTest.java  | 54 +++++++++++++++
 .../internal/jta/ClientServerJTADUnitTest.java     | 77 +++++++++++++++++++---
 .../codeAnalysis/sanctionedDataSerializables.txt   |  4 +-
 15 files changed, 250 insertions(+), 84 deletions(-)
 copy geode-core/src/test/java/org/apache/geode/internal/cache/{DistributedCacheOperationTest.java => JtaAfterCompletionMessageTest.java} (53%)
 copy geode-core/src/test/java/org/apache/geode/internal/cache/{CacheOperationMessageTest.java => JtaBeforeCompletionMessageTest.java} (53%)
 create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/TXRemoteCommitMessageTest.java

-- 
To stop receiving notification emails like this one, please contact
eshu11@apache.org.

[geode] 01/02: GEODE-4142: save commit message for failover after jta commit.

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-4142
in repository https://gitbox.apache.org/repos/asf/geode.git

commit ae5fc65c019090cebcc38c939931c1bfd7225730
Author: eshu <es...@pivotal.io>
AuthorDate: Wed Jan 10 12:55:13 2018 -0800

    GEODE-4142: save commit message for failover after jta commit.
    
      check if the jta has completed before retrying the beforeCompeltion and afterCompletion in failover cases.
---
 .../geode/internal/cache/DistTXCommitMessage.java  | 14 ++--
 .../internal/cache/JtaAfterCompletionMessage.java  | 11 +++-
 .../internal/cache/JtaBeforeCompletionMessage.java |  6 ++
 .../geode/internal/cache/TXCommitMessage.java      |  4 ++
 .../apache/geode/internal/cache/TXManagerImpl.java | 10 ++-
 .../internal/cache/TXRemoteCommitMessage.java      | 14 ++--
 .../org/apache/geode/internal/cache/TXState.java   |  7 +-
 .../cache/tier/sockets/command/CommitCommand.java  |  5 +-
 .../sockets/command/TXSynchronizationCommand.java  | 27 +++++++-
 .../cache/JtaAfterCompletionMessageTest.java       | 52 +++++++++++++++
 .../cache/JtaBeforeCompletionMessageTest.java      | 51 ++++++++++++++
 .../internal/jta/ClientServerJTADUnitTest.java     | 77 +++++++++++++++++++---
 12 files changed, 245 insertions(+), 33 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
index 5dc9e7e..e29d10a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
@@ -76,17 +76,17 @@ public class DistTXCommitMessage extends TXMessage {
     InternalCache cache = dm.getCache();
     TXManagerImpl txMgr = cache.getTXMgr();
     final TXStateProxy txStateProxy = txMgr.getTXState();
-    TXCommitMessage cmsg = null;
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
     try {
       // do the actual commit, only if it was not done before
-      if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+      if (commitMessage != null) {
         if (logger.isDebugEnabled()) {
           logger.debug(
               "DistTXCommitMessage.operateOnTx: found a previously committed transaction:{}", txId);
         }
-        cmsg = txMgr.getRecentlyCompletedMessage(txId);
-        if (txMgr.isExceptionToken(cmsg)) {
-          throw txMgr.getExceptionForToken(cmsg, txId);
+        commitMessage = txMgr.getRecentlyCompletedMessage(txId);
+        if (txMgr.isExceptionToken(commitMessage)) {
+          throw txMgr.getExceptionForToken(commitMessage, txId);
         }
       } else {
         // [DISTTX] TODO - Handle scenarios of no txState
@@ -125,13 +125,13 @@ public class DistTXCommitMessage extends TXMessage {
 
           txMgr.commit();
 
-          cmsg = txStateProxy.getCommitMessage();
+          commitMessage = txStateProxy.getCommitMessage();
         }
       }
     } finally {
       txMgr.removeHostedTXState(txId);
     }
-    DistTXCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+    DistTXCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage, getReplySender(dm));
 
     /*
      * return false so there isn't another reply
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
index 645b087..d843ba8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
@@ -78,11 +78,18 @@ public class JtaAfterCompletionMessage extends TXMessage {
     if (logger.isDebugEnabled()) {
       logger.debug("JTA: Calling afterCompletion for :{}", txId);
     }
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
+    if (commitMessage != null) {
+      TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
+          getReplySender(dm));
+      return false;
+    }
     TXStateProxy txState = txMgr.getTXState();
     txState.setCommitOnBehalfOfRemoteStub(true);
     txState.afterCompletion(status);
-    TXCommitMessage cmsg = txState.getCommitMessage();
-    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+    commitMessage = txState.getCommitMessage();
+    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
+        getReplySender(dm));
     txMgr.removeHostedTXState(txId);
     return false;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
index a3f44ef..0727cb0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
@@ -61,6 +61,12 @@ public class JtaBeforeCompletionMessage extends TXMessage {
     if (logger.isDebugEnabled()) {
       logger.debug("JTA: Calling beforeCompletion for :{}", txId);
     }
+    // Check if jta has been completed, possible due to tx failover.
+    // No need to execute beforeCompletion if already completed.
+    if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+      return true;
+    }
+
     txMgr.getTXState().beforeCompletion();
     return true;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index fb5b6c5..feed893 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -145,6 +145,10 @@ public class TXCommitMessage extends PooledDistributionMessage
    * transaction
    */
   public static final TXCommitMessage EXCEPTION_MSG = new TXCommitMessage();
+  /**
+   * A token to be put in TXManagerImpl#failoverMap to represent a rolled back transaction
+   */
+  public static final TXCommitMessage ROLLBACK_MSG = new TXCommitMessage();
 
   public TXCommitMessage(TXId txIdent, DM dm, TXState txState) {
     this.dm = dm;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index 560629b..85a6e92 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -1146,9 +1146,11 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     }
   }
 
-  private void saveTXStateForClientFailover(TXStateProxy tx) {
+  void saveTXStateForClientFailover(TXStateProxy tx) {
     if (tx.isOnBehalfOfClient() && tx.isRealDealLocal()) {
-      failoverMap.put(tx.getTxId(), tx.getCommitMessage());
+      TXCommitMessage commitMessage =
+          tx.getCommitMessage() == null ? TXCommitMessage.ROLLBACK_MSG : tx.getCommitMessage();
+      failoverMap.put(tx.getTxId(), commitMessage);
       if (logger.isDebugEnabled()) {
         logger.debug(
             "TX: storing client initiated transaction:{}; now there are {} entries in the failoverMap",
@@ -1224,7 +1226,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
    * @see #isExceptionToken(TXCommitMessage)
    */
   public TXCommitMessage getRecentlyCompletedMessage(TXId txId) {
-    return failoverMap.get(txId);
+    synchronized (failoverMap) {
+      return failoverMap.get(txId);
+    }
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
index 8036f92..d9d5cb8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
@@ -80,16 +80,15 @@ public class TXRemoteCommitMessage extends TXMessage {
       logger.debug("TX: Committing: {}", txId);
     }
     final TXStateProxy txState = txMgr.getTXState();
-    TXCommitMessage cmsg = null;
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
     try {
       // do the actual commit, only if it was not done before
-      if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+      if (commitMessage != null) {
         if (logger.isDebugEnabled()) {
           logger.debug("TX: found a previously committed transaction:{}", txId);
         }
-        cmsg = txMgr.getRecentlyCompletedMessage(txId);
-        if (txMgr.isExceptionToken(cmsg)) {
-          throw txMgr.getExceptionForToken(cmsg, txId);
+        if (txMgr.isExceptionToken(commitMessage)) {
+          throw txMgr.getExceptionForToken(commitMessage, txId);
         }
       } else {
         // if no TXState was created (e.g. due to only getEntry/size operations
@@ -97,13 +96,14 @@ public class TXRemoteCommitMessage extends TXMessage {
         if (txState != null) {
           txState.setCommitOnBehalfOfRemoteStub(true);
           txMgr.commit();
-          cmsg = txState.getCommitMessage();
+          commitMessage = txState.getCommitMessage();
         }
       }
     } finally {
       txMgr.removeHostedTXState(txId);
     }
-    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
+        getReplySender(dm));
 
     /*
      * return false so there isn't another reply
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index b7f4c3e..6462e3b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -1054,8 +1054,9 @@ public class TXState implements TXStateInterface {
         Assert.assertTrue(this.locks != null,
             "Gemfire Transaction afterCompletion called with illegal state.");
         try {
-          this.proxy.getTxMgr().setTXState(null);
+          proxy.getTxMgr().setTXState(null);
           commit();
+          saveTXCommitMessageForClientFailover();
         } catch (CommitConflictException error) {
           Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
               + " afterCompletion failed.due to CommitConflictException: " + error);
@@ -1069,6 +1070,7 @@ public class TXState implements TXStateInterface {
         this.jtaLifeTime = opStart - getBeginTime();
         this.proxy.getTxMgr().setTXState(null);
         rollback();
+        saveTXCommitMessageForClientFailover();
         this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
         break;
       default:
@@ -1077,6 +1079,9 @@ public class TXState implements TXStateInterface {
     // System.err.println("end afterCompletion");
   }
 
+  private void saveTXCommitMessageForClientFailover() {
+    proxy.getTxMgr().saveTXStateForClientFailover(proxy);
+  }
 
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
index bcdc632..2ab92e4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
@@ -56,9 +56,8 @@ public class CommitCommand extends BaseCommand {
         (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
     int uniqId = clientMessage.getTransactionId();
     TXId txId = new TXId(client, uniqId);
-    TXCommitMessage commitMsg = null;
-    if (txMgr.isHostedTxRecentlyCompleted(txId)) {
-      commitMsg = txMgr.getRecentlyCompletedMessage(txId);
+    TXCommitMessage commitMsg = txMgr.getRecentlyCompletedMessage(txId);
+    if (commitMsg != null) {
       if (logger.isDebugEnabled()) {
         logger.debug("TX: returning a recently committed txMessage for tx: {}", txId);
       }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index b7b90d2..c1fb616 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -25,6 +25,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
 import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.TXStateProxy;
 import org.apache.geode.internal.cache.TXSynchronizationRunnable;
@@ -72,7 +73,7 @@ public class TXSynchronizationCommand extends BaseCommand {
   public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
       final SecurityService securityService, long start)
       throws IOException, ClassNotFoundException, InterruptedException {
-
+    final boolean isDebugEnabled = logger.isDebugEnabled();
     serverConnection.setAsTrue(REQUIRES_RESPONSE);
 
     CompletionType type = CompletionType.values()[clientMessage.getPart(0).getInt()];
@@ -93,11 +94,33 @@ public class TXSynchronizationCommand extends BaseCommand {
     // get the tx state without associating it with this thread. That's done later
     final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member, true);
 
+    final TXId txId = txProxy.getTxId();
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
+    if (commitMessage != null) {
+      assert type == CompletionType.AFTER_COMPLETION;
+      try {
+        CommitCommand.writeCommitResponse(commitMessage, clientMessage, serverConnection);
+      } catch (IOException e) {
+        if (isDebugEnabled) {
+          logger.debug("Problem writing reply to client", e);
+        }
+      } catch (RuntimeException e) {
+        try {
+          writeException(clientMessage, e, false, serverConnection);
+        } catch (IOException ioe) {
+          if (isDebugEnabled) {
+            logger.debug("Problem writing reply to client", ioe);
+          }
+        }
+      }
+      serverConnection.setAsTrue(RESPONDED);
+      return;
+    }
+
     // we have to run beforeCompletion and afterCompletion in the same thread
     // because beforeCompletion obtains locks for the thread and afterCompletion
     // releases them
     if (txProxy != null) {
-      final boolean isDebugEnabled = logger.isDebugEnabled();
       try {
         if (type == CompletionType.BEFORE_COMPLETION) {
           Runnable beforeCompletion = new Runnable() {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java
new file mode 100644
index 0000000..1efa0a4
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JtaAfterCompletionMessageTest {
+  @Test
+  public void testAfterCompletionNotInvokedIfJTACompleted() throws Exception {
+    InternalCache cache = mock(InternalCache.class);
+    TXManagerImpl txMgr = mock(TXManagerImpl.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    TXId txId = mock(TXId.class);
+
+    when(distributionManager.getCache()).thenReturn(cache);
+    when(cache.getTXMgr()).thenReturn(txMgr);
+    when(txMgr.getRecentlyCompletedMessage(txId)).thenReturn(mock(TXCommitMessage.class));
+    when(txMgr.getTXState()).thenReturn(mock(TXStateProxyImpl.class));
+
+    JtaAfterCompletionMessage message = new JtaAfterCompletionMessage();
+    JtaAfterCompletionMessage spyMessage = spy(message);
+    when(spyMessage.getSender()).thenReturn(mock(InternalDistributedMember.class));
+
+    spyMessage.operateOnTx(txId, distributionManager);
+    verify(txMgr, never()).getTXState();
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java
new file mode 100644
index 0000000..7b7e6ab
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JtaBeforeCompletionMessageTest {
+  @Test
+  public void testBeforeCompletionNotInvokedIfJTACompleted() throws Exception {
+    InternalCache cache = mock(InternalCache.class);
+    TXManagerImpl txMgr = mock(TXManagerImpl.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    TXId txId = mock(TXId.class);
+
+    when(distributionManager.getCache()).thenReturn(cache);
+    when(cache.getTXMgr()).thenReturn(txMgr);
+    when(txMgr.isHostedTxRecentlyCompleted(txId)).thenReturn(true);
+    when(txMgr.getTXState()).thenReturn(mock(TXStateProxyImpl.class));
+
+    JtaBeforeCompletionMessage message = new JtaBeforeCompletionMessage(1,
+        mock(InternalDistributedMember.class), mock(ReplyProcessor21.class));
+
+    message.operateOnTx(txId, distributionManager);
+    verify(txMgr, never()).getTXState();
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
index 835fb7e..d739010 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import javax.transaction.Status;
+import javax.transaction.TransactionManager;
 
 import org.awaitility.Awaitility;
 import org.junit.Test;
@@ -36,7 +37,10 @@ import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
 import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
 import org.apache.geode.internal.cache.TXStateProxyImpl;
 import org.apache.geode.internal.cache.tx.ClientTXStateStub;
 import org.apache.geode.internal.logging.LogService;
@@ -61,14 +65,7 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
     getBlackboard().initBlackboard();
     final Properties properties = getDistributedSystemProperties();
 
-    final int port = server.invoke("create cache", () -> {
-      Cache cache = getCache(properties);
-      CacheServer cacheServer = createCacheServer(cache, 0);
-      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
-      region.put(key, value);
-
-      return cacheServer.getPort();
-    });
+    final int port = server.invoke(() -> createServerRegion(regionName, properties));
 
     client.invoke(() -> createClientRegion(host, port, regionName));
 
@@ -193,4 +190,68 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
     }
     assertTrue(region.get(key).equals(newValue));
   }
+
+  @Test
+  public void testClientCompletedJTAIsInFailoverMap() throws Exception {
+    final String regionName = getUniqueName();
+    final Properties properties = getDistributedSystemProperties();
+
+    final int port = server.invoke(() -> createServerRegion(regionName, properties));
+
+    createClientRegion(host, port, regionName);
+
+    Region region = getCache().getRegion(regionName);
+    assertTrue(region.get(key).equals(value));
+
+    TransactionManager JTAManager =
+        (TransactionManager) getCache().getJNDIContext().lookup("java:/TransactionManager");
+    assertNotNull(JTAManager);
+
+    // commit
+    JTAManager.begin();
+    region.put(key, newValue);
+    final TXId committedTXId = getTxId();
+    JTAManager.commit();
+    assertTrue(region.get(key).equals(newValue));
+
+    server.invoke(() -> verifyJTAIsCompleted(properties, committedTXId));
+
+    // rollback
+    JTAManager.begin();
+    region.put(key, "UncommittedValue");
+    final TXId rolledBackTXId = getTxId();
+    JTAManager.rollback();
+    assertTrue(region.get(key).equals(newValue));
+
+    server.invoke(() -> verifyJTAIsRollback(properties, rolledBackTXId));
+  }
+
+  private Integer createServerRegion(String regionName, Properties properties) {
+    Cache cache = getCache(properties);
+    CacheServer cacheServer = createCacheServer(cache, 0);
+    Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+    region.put(key, value);
+
+    return cacheServer.getPort();
+  }
+
+  private TXId getTxId() {
+    TXManagerImpl txManager = (TXManagerImpl) getCache().getCacheTransactionManager();
+    TXStateProxy txStateProxy = txManager.getTXState();
+    return txStateProxy.getTxId();
+  }
+
+  private void verifyJTAIsCompleted(Properties properties, TXId committedTXId) {
+    Cache cache = getCache(properties);
+    assertTrue(((TXManagerImpl) cache.getCacheTransactionManager())
+        .isHostedTxRecentlyCompleted(committedTXId));
+  }
+
+  private void verifyJTAIsRollback(Properties properties, TXId rollbackTXId) {
+    Cache cache = getCache(properties);
+    assertEquals(TXCommitMessage.ROLLBACK_MSG, ((TXManagerImpl) cache.getCacheTransactionManager())
+        .getRecentlyCompletedMessage(rollbackTXId));
+
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
eshu11@apache.org.

[geode] 02/02: fix a race: FindRemoteTXMessageReply should not modify tx commit message state as it does not hold the tx lock; instead, clinet version is modified in TXRemoteCommitMessage when the tx lock is held.

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-4142
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 5828c00f6d67066882a5edd913735eaa42203936
Author: eshu <es...@pivotal.io>
AuthorDate: Mon Jan 22 13:10:03 2018 -0800

    fix a race: FindRemoteTXMessageReply should not modify tx commit message state as it does not hold the tx lock;
    instead, clinet version is modified in TXRemoteCommitMessage when the tx lock is held.
---
 .../geode/internal/cache/FindRemoteTXMessage.java  | 12 ++---
 .../internal/cache/JtaAfterCompletionMessage.java  |  5 +-
 .../internal/cache/TXRemoteCommitMessage.java      |  3 ++
 .../sockets/command/TXSynchronizationCommand.java  |  2 +-
 .../internal/cache/TXRemoteCommitMessageTest.java  | 54 ++++++++++++++++++++++
 .../codeAnalysis/sanctionedDataSerializables.txt   |  4 +-
 6 files changed, 68 insertions(+), 12 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java
index bedd04d..7afb9db 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java
@@ -255,14 +255,12 @@ public class FindRemoteTXMessage extends HighPriorityDistributionMessage
     public void toData(DataOutput out) throws IOException {
       super.toData(out);
       out.writeBoolean(this.isHostingTx);
-      boolean sendTXCommitMessage = this.txCommitMessage != null;
+      // Do not send TxCommitMessage for TXFailover.
+      // FindRemoteTXMessage does not hold the tx lock and should not
+      // modify TXCommitMessage state to cause race condition.
+      // This will force a retry and use the TXRemoteCommitMessage to get the commit message.
+      boolean sendTXCommitMessage = false;
       out.writeBoolean(sendTXCommitMessage);
-      if (sendTXCommitMessage) {
-        out.writeBoolean(this.isPartialCommitMessage);
-        // since this message is going to a peer, reset client version
-        txCommitMessage.setClientVersion(null); // fixes bug 46529
-        InternalDataSerializer.writeDSFID(txCommitMessage, out);
-      }
     }
 
     @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
index d843ba8..96b8d33 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
@@ -80,8 +80,9 @@ public class JtaAfterCompletionMessage extends TXMessage {
     }
     TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
     if (commitMessage != null) {
-      TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
-          getReplySender(dm));
+      TXCommitMessage message =
+          commitMessage == TXCommitMessage.ROLLBACK_MSG ? null : commitMessage;
+      TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), message, getReplySender(dm));
       return false;
     }
     TXStateProxy txState = txMgr.getTXState();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
index d9d5cb8..7960fb2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
@@ -171,6 +171,9 @@ public class TXRemoteCommitMessage extends TXMessage {
     public static void send(InternalDistributedMember recipient, int processorId,
         TXCommitMessage val, ReplySender replySender) throws RemoteOperationException {
       Assert.assertTrue(recipient != null, "TXRemoteCommitReply NULL reply message");
+      if (val != null) {
+        val.setClientVersion(null);
+      }
       TXRemoteCommitReplyMessage m = new TXRemoteCommitReplyMessage(processorId, val);
       m.setRecipient(recipient);
       replySender.putOutgoing(m);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index c1fb616..2671e4e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -96,7 +96,7 @@ public class TXSynchronizationCommand extends BaseCommand {
 
     final TXId txId = txProxy.getTxId();
     TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
-    if (commitMessage != null) {
+    if (commitMessage != null && commitMessage != TXCommitMessage.ROLLBACK_MSG) {
       assert type == CompletionType.AFTER_COMPLETION;
       try {
         CommitCommand.writeCommitResponse(commitMessage, clientMessage, serverConnection);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXRemoteCommitMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXRemoteCommitMessageTest.java
new file mode 100644
index 0000000..2724499
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXRemoteCommitMessageTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.*;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.internal.ByteArrayData;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class TXRemoteCommitMessageTest {
+
+  @Test
+  public void testFindRemoteTXMessageReplyDoesNotSendCommitMessage() throws Exception {
+    FindRemoteTXMessage.FindRemoteTXMessageReply findRemoteTXMessageReply =
+        new FindRemoteTXMessage.FindRemoteTXMessageReply();
+    findRemoteTXMessageReply.isHostingTx = true;
+    findRemoteTXMessageReply.txCommitMessage = new TXCommitMessage();
+
+    ByteArrayData testStream = new ByteArrayData();
+    assertTrue(testStream.isEmpty());
+
+    DataOutputStream out = testStream.getDataOutput();
+    findRemoteTXMessageReply.toData(out);
+    assertTrue(testStream.size() > 0);
+
+    DataInput in = testStream.getDataInput();
+    FindRemoteTXMessage.FindRemoteTXMessageReply findRemoteTXMessageReplyFromData =
+        new FindRemoteTXMessage.FindRemoteTXMessageReply();
+    findRemoteTXMessageReplyFromData.fromData(in);
+
+    assertEquals(true, findRemoteTXMessageReplyFromData.isHostingTx);
+    assertNull(findRemoteTXMessageReplyFromData.txCommitMessage);
+  }
+}
diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index e9a08a5..cf2c87a 100644
--- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1031,8 +1031,8 @@ fromData,27,2a2bb700452a2bb80046c00047b500032a2bb900480100b50004b1
 toData,24,2a2bb700422ab400032bb800432b2ab40004b900440200b1
 
 org/apache/geode/internal/cache/FindRemoteTXMessage$FindRemoteTXMessageReply,2
-fromData,46,2a2bb7000a2a2bb9000b0100b500042bb9000b01009900182a2bb9000b0100b500072a2bb8000cc0000db50006b1
-toData,66,2a2bb700032b2ab40004b9000502002ab40006c6000704a70004033d2b1cb9000502001c99001d2b2ab40007b9000502002ab4000601b600082ab400062bb80009b1
+fromData,46,2a2bb700062a2bb900070100b500042bb9000701009900182a2bb900070100b500082a2bb80009c0000ab5000bb1
+toData,25,2a2bb700032b2ab40004b900050200033d2b1cb900050200b1
 
 org/apache/geode/internal/cache/FindVersionTagOperation$FindVersionTagMessage,2
 fromData,55,2a2bb700232a2bb900240100b500032a2bb900250100b500042abb002659b70027b500052ab400052bb800282a2bb900290100b50006b1

-- 
To stop receiving notification emails like this one, please contact
eshu11@apache.org.