You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/06/07 21:53:06 UTC

[07/17] incubator-geode git commit: GEODE-1491: Make sure when checking if a transaction is completed from isHostedTxRecentlyCompleted() method, the rolled back transaction is considered as well.

GEODE-1491: Make sure when checking if a transaction is completed from isHostedTxRecentlyCompleted() method, the rolled back transaction is considered as well.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0815e1b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0815e1b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0815e1b6

Branch: refs/heads/feature/GEODE-837
Commit: 0815e1b6e9f11881b03780ac730856ee5db36c9c
Parents: 001a4e1
Author: eshu <es...@pivotal.io>
Authored: Mon Jun 6 12:38:25 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Mon Jun 6 12:38:25 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/TXManagerImpl.java   | 20 +++---
 .../cache/partitioned/PartitionMessage.java     |  2 +-
 .../tier/sockets/command/RollbackCommand.java   |  5 ++
 .../internal/cache/TXManagerImplTest.java       | 70 ++++++++++++++++++--
 4 files changed, 83 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0815e1b6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index df0176d..2608878 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@ -1021,16 +1021,18 @@ public class TXManagerImpl implements CacheTransactionManager,
   }
   
   public boolean isHostedTxRecentlyCompleted(TXId txId) {
-    // if someone is asking to see if we have the txId, they will come
-    // back and ask for the commit message, this could take a long time
-    // specially when called from TXFailoverCommand, so we move
-    // the txId to the front of the queue
-    TXCommitMessage msg = failoverMap.remove(txId);
-    if (msg != null) {
-      failoverMap.put(txId, msg);
-      return true;
+    synchronized(failoverMap) {
+      if (failoverMap.containsKey(txId)) {
+        // if someone is asking to see if we have the txId, they will come
+        // back and ask for the commit message, this could take a long time
+        // specially when called from TXFailoverCommand, so we move
+        // the txId back to the linked map by removing and putting it back.
+        TXCommitMessage msg = failoverMap.remove(txId);
+        failoverMap.put(txId, msg);
+        return true;
+      }
+      return false;
     }
-    return false;
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0815e1b6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
index 9c54587..c2ab27e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
@@ -186,7 +186,7 @@ public abstract class PartitionMessage extends DistributionMessage implements
    * (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.TransactionMessage#getTXOriginatorClient()
    */
-  public final InternalDistributedMember getTXOriginatorClient() {
+  public InternalDistributedMember getTXOriginatorClient() {
     return txMemberId;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0815e1b6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RollbackCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RollbackCommand.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RollbackCommand.java
index ed7c706..f0316bd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RollbackCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RollbackCommand.java
@@ -63,6 +63,11 @@ public class RollbackCommand extends BaseCommand {
         txId = txState.getTxId();
         txMgr.rollback();
         sendRollbackReply(msg, servConn);
+      } else {
+        //could not find TxState in the host server.
+        //Protect against a failover command received so late,
+        //and it is removed from the failoverMap due to capacity.
+        sendRollbackReply(msg, servConn);
       }
     } catch (Exception e) {
       writeException(msg, e, false, servConn);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0815e1b6/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
index a4b8127..ce24947 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage;
 import com.gemstone.gemfire.test.fake.Fakes;
@@ -44,24 +45,29 @@ public class TXManagerImplTest {
   TXId completedTxid;
   TXId notCompletedTxid;
   InternalDistributedMember member;
-  CountDownLatch latch = new CountDownLatch(1);
+  CountDownLatch latch;
   TXStateProxy tx1, tx2;
+  DistributionManager dm;
+  TXRemoteRollbackMessage rollbackMsg;
 
   @Before
   public void setUp() {
     Cache cache = Fakes.cache();
-    txMgr = new TXManagerImpl(null, cache);
+    dm = mock(DistributionManager.class);
+    txMgr = new TXManagerImpl(mock(CachePerfStats.class), cache);
     txid = new TXId(null, 0);
     msg = mock(DestroyMessage.class);    
     txCommitMsg = mock(TXCommitMessage.class);
     member = mock(InternalDistributedMember.class);
     completedTxid = new TXId(member, 1);
     notCompletedTxid = new TXId(member, 2);
+    latch = new CountDownLatch(1);
+    rollbackMsg = new TXRemoteRollbackMessage();
     
     when(this.msg.canStartRemoteTransaction()).thenReturn(true);
     when(this.msg.canParticipateInTransaction()).thenReturn(true);
   }
-  
+
   @Test
   public void getOrSetHostedTXStateAbleToSetTXStateAndGetLock(){    
     TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
@@ -232,5 +238,61 @@ public class TXManagerImplTest {
     txMgr.saveTXCommitMessageForClientFailover(completedTxid, txCommitMsg); 
     assertTrue(txMgr.hasTxAlreadyFinished(tx, completedTxid));
   }
-  
+
+  @Test
+  public void txRolledbackShouldCompleteTx() throws InterruptedException {
+    when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
+
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        try {
+          tx1 = txMgr.masqueradeAs(msg);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+        try {
+          msg.process(dm);
+        } finally {
+          txMgr.unmasquerade(tx1);
+        }
+
+        TXStateProxy existingTx = masqueradeToRollback();
+        latch.countDown();
+        Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
+        .atMost(30, TimeUnit.SECONDS).until(() -> tx1.getLock().hasQueuedThreads());
+
+        rollbackTransaction(existingTx);
+      }
+    });
+    t1.start();
+
+    assertTrue(latch.await(60, TimeUnit.SECONDS));
+
+    TXStateProxy tx = txMgr.masqueradeAs(rollbackMsg);
+    assertEquals(tx, tx1);
+    t1.join();
+    rollbackTransaction(tx);
+  }
+
+  private TXStateProxy masqueradeToRollback() {
+    TXStateProxy existingTx;
+    try {
+      existingTx = txMgr.masqueradeAs(rollbackMsg);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    return existingTx;
+  }
+
+  private void rollbackTransaction(TXStateProxy existingTx) {
+    try {
+      if (!txMgr.isHostedTxRecentlyCompleted(txid)) {
+        txMgr.rollback();
+      }
+    } finally {
+      txMgr.unmasquerade(existingTx);
+    }
+  }
 }