You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2016/05/31 21:44:39 UTC

incubator-geode git commit: GEODE-1400: An inflight transaction op could arrive later than a client failover operation

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 8eac0fa8c -> 384d379ae


GEODE-1400: An inflight transaction op could arrive later than a client failover operation

 * Handle inflight p2p transaction message received
   later than failover message.
 * Add unit tests.
 * Move hasTxAlreadyFinished method to TXManagerImpl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/384d379a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/384d379a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/384d379a

Branch: refs/heads/develop
Commit: 384d379ae7040c8fe222afdeab96973f50157296
Parents: 8eac0fa
Author: eshu <es...@pivotal.io>
Authored: Tue May 24 12:04:52 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Tue May 31 14:39:42 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/RemoteOperationMessage.java  |  46 +++-
 .../gemfire/internal/cache/TXManagerImpl.java   |  71 +++++-
 .../cache/partitioned/PartitionMessage.java     |  54 ++++-
 .../cache/RemoteOperationMessageTest.java       |  93 ++++++++
 .../internal/cache/TXManagerImplTest.java       | 236 +++++++++++++++++++
 .../cache/partitioned/PartitionMessageTest.java | 100 ++++++++
 6 files changed, 566 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
index 8ffab72..19e1dea 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
@@ -187,7 +187,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
   /**
    * check to see if the cache is closing
    */
-  final public boolean checkCacheClosing(DistributionManager dm) {
+  public boolean checkCacheClosing(DistributionManager dm) {
     GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
     // return (cache != null && cache.isClosed());
     return cache == null || cache.isClosed();
@@ -197,11 +197,11 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
    * check to see if the distributed system is closing
    * @return true if the distributed system is closing
    */
-  final public boolean checkDSClosing(DistributionManager dm) {
+  public boolean checkDSClosing(DistributionManager dm) {
     InternalDistributedSystem ds = dm.getSystem();
     return (ds == null || ds.isDisconnecting());
   }
-  
+
   /**
    * Upon receipt of the message, both process the message and send an
    * acknowledgement, not necessarily in that order. Note: Any hang in this
@@ -222,8 +222,8 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
         thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString(dm.getId()));
         return;
       }
-      GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem());
-      r = gfc.getRegionByPathForProcessing(this.regionPath);
+      GemFireCacheImpl gfc = getCache(dm);
+      r = getRegionByPath(gfc);
       if (r == null && failIfRegionMissing()) {
         // if the distributed system is disconnecting, don't send a reply saying
         // the partitioned region can't be found (bug 36585)
@@ -235,13 +235,19 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
       thr = UNHANDLED_EXCEPTION;
       
       // [bruce] r might be null here, so we have to go to the cache instance to get the txmgr
-      TXManagerImpl txMgr = GemFireCacheImpl.getInstance().getTxManager();
-      TXStateProxy tx = null;
-      try {
-        tx = txMgr.masqueradeAs(this);
-        sendReply = operateOnRegion(dm, r, startTime);
-      } finally {
-        txMgr.unmasquerade(tx);
+      TXManagerImpl txMgr = getTXManager(gfc);
+      TXStateProxy tx = txMgr.masqueradeAs(this);
+      if (tx == null) {
+        sendReply = operateOnRegion(dm, r, startTime);        
+      } else {
+        try {
+          TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
+          if (!hasTxAlreadyFinished(tx, txMgr, txid)) {
+            sendReply = operateOnRegion(dm, r, startTime);       
+          }  
+        } finally {
+          txMgr.unmasquerade(tx);
+        }
       }
       thr = null;
           
@@ -310,6 +316,22 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
       } 
     }
   }
+
+  boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) {
+    return txMgr.hasTxAlreadyFinished(tx, txid);
+  }
+
+  TXManagerImpl getTXManager(GemFireCacheImpl cache) {
+    return cache.getTxManager();
+  }
+
+  LocalRegion getRegionByPath(GemFireCacheImpl gfc) {
+    return gfc.getRegionByPathForProcessing(this.regionPath);
+  }
+
+  GemFireCacheImpl getCache(final DistributionManager dm) {
+    return (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem());
+  }
   
   /** Send a generic ReplyMessage.  This is in a method so that subclasses can override the reply message type
    * @param pr the Partitioned Region for the message whose statistics are incremented

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index dd4653c..49926e6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@ -84,7 +84,7 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
  * 
  * @see CacheTransactionManager
  */
-public final class TXManagerImpl implements CacheTransactionManager,
+public class TXManagerImpl implements CacheTransactionManager,
     MembershipListener {
 
   private static final Logger logger = LogService.getLogger();
@@ -729,8 +729,26 @@ public final class TXManagerImpl implements CacheTransactionManager,
       return null;
     }
     TXId key = new TXId(msg.getMemberToMasqueradeAs(), msg.getTXUniqId());
-    TXStateProxy val;
-    val = this.hostedTXStates.get(key);
+    TXStateProxy val = getOrSetHostedTXState(key, msg);
+
+    if (val != null) {
+      boolean success = getLock(val, key);
+      while (!success) {
+        val = getOrSetHostedTXState(key, msg);
+        if (val != null) {
+          success = getLock(val, key);
+        } else {
+          break;
+        }
+      }
+    }
+
+    setTXState(val);
+    return val;
+  }
+
+  TXStateProxy getOrSetHostedTXState(TXId key, TransactionMessage msg) {
+    TXStateProxy val = this.hostedTXStates.get(key);
     if (val == null) {
       synchronized(this.hostedTXStates) {
         val = this.hostedTXStates.get(key);
@@ -746,14 +764,49 @@ public final class TXManagerImpl implements CacheTransactionManager,
         }
       }
     }
-    if (val != null) {
-      if (!val.getLock().isHeldByCurrentThread()) {
-        val.getLock().lock();
+    return val;
+  }
+
+  boolean getLock(TXStateProxy val, TXId key) {
+    if (!val.getLock().isHeldByCurrentThread()) {
+      val.getLock().lock();
+      synchronized (this.hostedTXStates) {
+        TXStateProxy curVal = this.hostedTXStates.get(key);
+        // Inflight op could be received later than TXFailover operation.
+        if (curVal == null) {
+          if (!isHostedTxRecentlyCompleted(key)) {
+            this.hostedTXStates.put(key, val);
+            // Failover op removed the val
+            // It is possible that the same operation can be executed
+            // twice by two threads, but data is consistent.
+          }
+        } else {
+          if (val != curVal) {
+            //Failover op replaced with a new TXStateProxyImpl
+            //Use the new one instead.
+            val.getLock().unlock();
+            return false;
+          }
+        }
       }
     }
-
-    setTXState(val);
-    return val;
+    return true;
+  }
+  
+  public boolean hasTxAlreadyFinished(TXStateProxy tx, TXId txid) {
+    if (tx == null) {
+      return false;
+    }
+    if (isHostedTxRecentlyCompleted(txid)) {
+      //Should only happen when handling a later arrival of transactional op from proxy,
+      //while the transaction has failed over and already committed or rolled back.
+      //Just send back reply as a success op.
+      //The client connection should be lost from proxy, or
+      //the proxy is closed for failover to occur.
+      logger.info("TxId {} has already finished." , txid);
+      return true;
+    }
+    return false;
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
index db4cc59..9c54587 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
@@ -58,6 +58,7 @@ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegionException;
 import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.TXId;
 import com.gemstone.gemfire.internal.cache.TXManagerImpl;
 import com.gemstone.gemfire.internal.cache.TXStateProxy;
 import com.gemstone.gemfire.internal.cache.TransactionMessage;
@@ -262,8 +263,8 @@ public abstract class PartitionMessage extends DistributionMessage implements
   /**
    * check to see if the cache is closing
    */
-  final public boolean checkCacheClosing(DistributionManager dm) {
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+  public boolean checkCacheClosing(DistributionManager dm) {
+    GemFireCacheImpl cache = getGemFireCacheImpl();
     // return (cache != null && cache.isClosed());
     return cache == null || cache.isClosed();
   }
@@ -272,11 +273,32 @@ public abstract class PartitionMessage extends DistributionMessage implements
    * check to see if the distributed system is closing
    * @return true if the distributed system is closing
    */
-  final public boolean checkDSClosing(DistributionManager dm) {
+  public boolean checkDSClosing(DistributionManager dm) {
     InternalDistributedSystem ds = dm.getSystem();
     return (ds == null || ds.isDisconnecting());
   }
   
+  boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) {
+    return txMgr.hasTxAlreadyFinished(tx, txid);
+  }
+  
+  PartitionedRegion getPartitionedRegion() throws PRLocallyDestroyedException {
+    return PartitionedRegion.getPRFromId(this.regionId);
+  }
+  
+  GemFireCacheImpl getGemFireCacheImpl() {
+    return GemFireCacheImpl.getInstance();
+  }
+
+  TXManagerImpl getTXManagerImpl(GemFireCacheImpl cache) {
+    return cache.getTxManager();
+  }
+  
+  long getStartPartitionMessageProcessingTime(PartitionedRegion pr) {
+    return pr.getPrStats().startPartitionMessageProcessing();
+  }
+
+  
   /**
    * Upon receipt of the message, both process the message and send an
    * acknowledgement, not necessarily in that order. Note: Any hang in this
@@ -298,7 +320,7 @@ public abstract class PartitionMessage extends DistributionMessage implements
         thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString(dm.getId()));
         return;
       }
-      pr = PartitionedRegion.getPRFromId(this.regionId);
+      pr = getPartitionedRegion();
       if (pr == null && failIfRegionMissing()) {
         // if the distributed system is disconnecting, don't send a reply saying
         // the partitioned region can't be found (bug 36585)
@@ -307,21 +329,27 @@ public abstract class PartitionMessage extends DistributionMessage implements
       }
 
       if (pr != null) {
-        startTime = pr.getPrStats().startPartitionMessageProcessing();
+        startTime = getStartPartitionMessageProcessingTime(pr);
       }
       thr = UNHANDLED_EXCEPTION;
       
-      GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+      GemFireCacheImpl cache = getGemFireCacheImpl();
       if(cache==null) {
         throw new ForceReattemptException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString());
       }
-      TXManagerImpl txMgr = cache.getTxManager();
-      TXStateProxy tx = null;
-      try {
-        tx = txMgr.masqueradeAs(this);
-        sendReply = operateOnPartitionedRegion(dm, pr, startTime);
-      } finally {
-        txMgr.unmasquerade(tx);
+      TXManagerImpl txMgr = getTXManagerImpl(cache);
+      TXStateProxy tx = txMgr.masqueradeAs(this);
+      if (tx == null) {
+        sendReply = operateOnPartitionedRegion(dm, pr, startTime);        
+      } else {
+        try {
+          TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
+          if (!hasTxAlreadyFinished(tx, txMgr, txid)) {
+            sendReply = operateOnPartitionedRegion(dm, pr, startTime);        
+          }  
+        } finally {
+          txMgr.unmasquerade(tx);
+        }
       }
       thr = null;
           

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
new file mode 100644
index 0000000..ecfc2b0
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static org.mockito.Mockito.*;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
+
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.TXId;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+public class RemoteOperationMessageTest {
+  private GemFireCacheImpl cache;
+  private RemoteOperationMessage msg;
+  private DistributionManager dm;
+  private LocalRegion r;
+  private TXManagerImpl txMgr;
+  private TXId txid;
+  private long startTime = 0;
+  TXStateProxy tx;
+  
+  @Before
+  public void setUp() throws InterruptedException {
+    cache = Fakes.cache();
+    dm = mock(DistributionManager.class);  
+    msg = mock(RemoteOperationMessage.class);
+    r = mock(LocalRegion.class);
+    txMgr = mock(TXManagerImpl.class);
+    txid = new TXId(null, 0);
+    tx = mock(TXStateProxyImpl.class);
+    
+    when(msg.checkCacheClosing(dm)).thenReturn(false);
+    when(msg.checkDSClosing(dm)).thenReturn(false);
+    when(msg.getCache(dm)).thenReturn(cache);
+    when(msg.getRegionByPath(cache)).thenReturn(r);
+    when(msg.getTXManager(cache)).thenReturn(txMgr);
+    when(txMgr.hasTxAlreadyFinished(tx, txid)).thenCallRealMethod();
+
+    doAnswer(new CallsRealMethods()).when(msg).process(dm);    
+  }
+  
+  @Test
+  public void messageWithNoTXPerformsOnRegion() throws InterruptedException, RemoteOperationException {
+    when(txMgr.masqueradeAs(msg)).thenReturn(null);
+    msg.process(dm);
+
+    verify(msg, times(1)).operateOnRegion(dm, r, startTime);
+  }
+
+  @Test
+  public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, RemoteOperationException {
+    when(txMgr.masqueradeAs(msg)).thenReturn(tx);
+    when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenCallRealMethod(); 
+    msg.process(dm);
+
+    verify(msg, times(1)).operateOnRegion(dm, r, startTime);
+  }
+  
+  @Test
+  public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, RemoteOperationException {
+    when(txMgr.masqueradeAs(msg)).thenReturn(tx);
+    when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true); 
+    msg.process(dm);
+
+    verify(msg, times(0)).operateOnRegion(dm, r, startTime);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
new file mode 100644
index 0000000..a4b8127
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+import com.jayway.awaitility.Awaitility;
+
+
+@Category(UnitTest.class)
+public class TXManagerImplTest {
+  private TXManagerImpl txMgr;
+  TXId txid;
+  DestroyMessage msg;
+  TXCommitMessage txCommitMsg;
+  TXId completedTxid;
+  TXId notCompletedTxid;
+  InternalDistributedMember member;
+  CountDownLatch latch = new CountDownLatch(1);
+  TXStateProxy tx1, tx2;
+
+  @Before
+  public void setUp() {
+    Cache cache = Fakes.cache();
+    txMgr = new TXManagerImpl(null, cache);
+    txid = new TXId(null, 0);
+    msg = mock(DestroyMessage.class);    
+    txCommitMsg = mock(TXCommitMessage.class);
+    member = mock(InternalDistributedMember.class);
+    completedTxid = new TXId(member, 1);
+    notCompletedTxid = new TXId(member, 2);
+    
+    when(this.msg.canStartRemoteTransaction()).thenReturn(true);
+    when(this.msg.canParticipateInTransaction()).thenReturn(true);
+  }
+  
+  @Test
+  public void getOrSetHostedTXStateAbleToSetTXStateAndGetLock(){    
+    TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+    
+    assertNotNull(tx);
+    assertEquals(tx, txMgr.getHostedTXState(txid));  
+    assertTrue(txMgr.getLock(tx, txid));
+  }
+
+  @Test
+  public void getLockAfterTXStateRemoved() throws InterruptedException{
+    TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+    
+    assertEquals(tx, txMgr.getHostedTXState(txid));  
+    assertTrue(txMgr.getLock(tx, txid));
+    assertNotNull(tx);
+    assertTrue(txMgr.getLock(tx, txid));
+    tx.getLock().unlock();
+       
+    TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);    
+    assertEquals(tx, oldtx);
+    
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        txMgr.removeHostedTXState(txid);
+      }
+    });
+    t1.start();
+    
+    t1.join();
+    
+    TXStateProxy curTx = txMgr.getHostedTXState(txid);
+    assertNull(curTx);
+    
+    //after failover command removed the txid from hostedTXState,
+    //getLock should put back the original TXStateProxy
+    assertTrue(txMgr.getLock(tx, txid));
+    assertEquals(tx, txMgr.getHostedTXState(txid));
+    
+    tx.getLock().unlock();
+  }
+  
+  @Test
+  public void getLockAfterTXStateReplaced() throws InterruptedException{  
+    TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
+    
+    assertEquals(oldtx, txMgr.getHostedTXState(txid));  
+    assertTrue(txMgr.getLock(oldtx, txid));
+    assertNotNull(oldtx);
+    oldtx.getLock().unlock();
+       
+    TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);    
+    assertEquals(tx, oldtx);
+    
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        txMgr.removeHostedTXState(txid);
+        //replace with new TXState
+        txMgr.getOrSetHostedTXState(txid, msg);
+      }
+    });
+    t1.start();
+    
+    t1.join();
+   
+    TXStateProxy curTx = txMgr.getHostedTXState(txid);
+    assertNotNull(curTx);
+    //replaced
+    assertNotEquals(tx, curTx);
+    
+    //after TXStateProxy replaced, getLock will not get 
+    assertFalse(txMgr.getLock(tx, txid));
+    
+  }
+  
+  @Test
+  public void getLockAfterTXStateCommitted() throws InterruptedException{  
+    TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
+    
+    assertEquals(oldtx, txMgr.getHostedTXState(txid));  
+    assertTrue(txMgr.getLock(oldtx, txid));
+    assertNotNull(oldtx);
+    oldtx.getLock().unlock();
+       
+    TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);    
+    assertEquals(tx, oldtx);
+    
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        txMgr.removeHostedTXState(txid);
+        txMgr.saveTXCommitMessageForClientFailover(txid, txCommitMsg);
+      }
+    });
+    t1.start();
+    
+    t1.join();
+   
+    TXStateProxy curTx = txMgr.getHostedTXState(txid);
+    assertNull(curTx);
+    
+    //after TXStateProxy committed, getLock will get the lock for the oldtx
+    //but caller should not perform ops on this TXStateProxy
+    assertTrue(txMgr.getLock(tx, txid));    
+  } 
+  
+  @Test
+  public void masqueradeAsCanGetLock() throws InterruptedException{  
+    TXStateProxy tx;
+
+    tx = txMgr.masqueradeAs(msg);
+    assertNotNull(tx);
+  }
+  
+  @Test
+  public void masqueradeAsCanGetLockAfterTXStateIsReplaced() throws InterruptedException{  
+    TXStateProxy tx;
+    
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        tx1 = txMgr.getHostedTXState(txid);
+        assertNull(tx1);
+        tx1 =txMgr.getOrSetHostedTXState(txid, msg);
+        assertNotNull(tx1);
+        assertTrue(txMgr.getLock(tx1, txid));
+
+        latch.countDown();
+        
+        Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
+        .atMost(30, TimeUnit.SECONDS).until(() -> tx1.getLock().hasQueuedThreads()); 
+        
+        txMgr.removeHostedTXState(txid);
+        
+        tx2 =txMgr.getOrSetHostedTXState(txid, msg);
+        assertNotNull(tx2);
+        assertTrue(txMgr.getLock(tx2, txid));
+        
+        tx2.getLock().unlock();
+        tx1.getLock().unlock();
+      }
+    });
+    t1.start();
+            
+    assertTrue(latch.await(60, TimeUnit.SECONDS));
+  
+    tx = txMgr.masqueradeAs(msg);
+    assertNotNull(tx);
+    assertEquals(tx, tx2);
+    tx.getLock().unlock();
+
+    t1.join();
+
+  }
+  
+  @Test
+  public void hasTxAlreadyFinishedDetectsNoTx() {   
+    assertFalse(txMgr.hasTxAlreadyFinished(null, txid));
+  }
+  
+  @Test
+  public void hasTxAlreadyFinishedDetectsTxNotFinished() {
+    TXStateProxy tx = txMgr.getOrSetHostedTXState(notCompletedTxid, msg);
+    assertFalse(txMgr.hasTxAlreadyFinished(tx, notCompletedTxid));
+  }
+  
+  @Test
+  public void hasTxAlreadyFinishedDetectsTxFinished() throws InterruptedException {
+    TXStateProxy tx = txMgr.getOrSetHostedTXState(completedTxid, msg);    
+    txMgr.saveTXCommitMessageForClientFailover(completedTxid, txCommitMsg); 
+    assertTrue(txMgr.hasTxAlreadyFinished(tx, completedTxid));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
new file mode 100644
index 0000000..bbbf714
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.partitioned;
+
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.query.QueryException;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.internal.cache.DataLocationException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.TXId;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+public class PartitionMessageTest {
+
+  private GemFireCacheImpl cache;
+  private PartitionMessage msg;
+  private DistributionManager dm;
+  private PartitionedRegion pr;
+  private TXManagerImpl txMgr;
+  private TXId txid;
+  private long startTime = 1;
+  TXStateProxy tx;
+  
+  @Before
+  public void setUp() throws PRLocallyDestroyedException, InterruptedException {
+    cache = Fakes.cache();
+    dm = mock(DistributionManager.class);  
+    msg = mock(PartitionMessage.class);
+    pr = mock(PartitionedRegion.class);
+    txMgr = mock(TXManagerImpl.class);
+    tx = mock(TXStateProxyImpl.class);
+    txid = new TXId(null, 0);
+    
+    when(msg.checkCacheClosing(dm)).thenReturn(false);
+    when(msg.checkDSClosing(dm)).thenReturn(false);
+    when(msg.getPartitionedRegion()).thenReturn(pr);
+    when(msg.getGemFireCacheImpl()).thenReturn(cache);
+    when(msg.getStartPartitionMessageProcessingTime(pr)).thenReturn(startTime);
+    when(msg.getTXManagerImpl(cache)).thenReturn(txMgr);
+    when(msg.hasTxAlreadyFinished(null, txMgr, txid)).thenCallRealMethod();
+    
+    doAnswer(new CallsRealMethods()).when(msg).process(dm);    
+  }
+
+  @Test
+  public void messageWithNoTXPerformsOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException {   
+    when(txMgr.masqueradeAs(msg)).thenReturn(null);
+    msg.process(dm);
+    
+    verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
+  }
+  
+  @Test
+  public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException {   
+    when(txMgr.masqueradeAs(msg)).thenReturn(tx);
+    msg.process(dm);
+    
+    verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
+  }
+  
+  @Test
+  public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException {   
+    when(txMgr.masqueradeAs(msg)).thenReturn(tx);
+    when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true);
+    msg.process(dm);
+  
+    verify(msg, times(0)).operateOnPartitionedRegion(dm, pr, startTime);
+  }
+
+}