You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2016/05/24 19:12:34 UTC

[1/2] incubator-geode git commit: Handle inflight p2p transaction message received later than failover message. Add unit test cases.

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1400 [created] 2d70868b5


Handle inflight p2p transaction message received later than failover message.
Add unit test cases.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8a6a805b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8a6a805b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8a6a805b

Branch: refs/heads/feature/GEODE-1400
Commit: 8a6a805b423095c87f63a75e32498a2964aba08b
Parents: 2234535
Author: eshu <es...@pivotal.io>
Authored: Tue May 24 12:04:52 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Tue May 24 12:04:52 2016 -0700

----------------------------------------------------------------------
 .../cache/RemoteOperationMessageTest.java       | 121 ++++++++++
 .../internal/cache/TXManagerImplTest.java       | 230 +++++++++++++++++++
 .../cache/partitioned/PartitionMessageTest.java | 131 +++++++++++
 3 files changed, 482 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a6a805b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
new file mode 100644
index 0000000..f965a45
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static org.mockito.Mockito.*;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
+
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.TXId;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+public class RemoteOperationMessageTest {
+  private GemFireCacheImpl cache;
+  private RemoteOperationMessage msg;
+  private DistributionManager dm;
+  private LocalRegion r;
+  private TXManagerImpl txMgr;
+  private TXId txid;
+  private long startTime = 0;
+  TXStateProxy tx;
+  
+  @Before
+  public void setUp() {
+    cache = Fakes.cache();
+    dm = mock(DistributionManager.class);  
+    msg = mock(RemoteOperationMessage.class);
+    r = mock(LocalRegion.class);
+    txMgr = new TXManagerImpl(null, cache);
+    txid = new TXId(null, 0);
+    tx = mock(TXStateProxyImpl.class);
+    
+    when(msg.checkCacheClosing(dm)).thenReturn(false);
+    when(msg.checkDSClosing(dm)).thenReturn(false);
+    when(msg.getCache(dm)).thenReturn(cache);
+    when(msg.getRegionByPath(cache)).thenReturn(r);
+    when(msg.getTXManager(cache)).thenReturn(txMgr);
+
+    doAnswer(new CallsRealMethods()).when(msg).process(dm);    
+  }
+
+  @Test
+  public void messageWithoutTxPerformsOnRegion() {
+    try {
+      when(msg.masqueradeAs(msg, txMgr)).thenReturn(null);
+    } catch (InterruptedException e1) {
+      e1.printStackTrace();
+    }
+    when(msg.hasTxAlreadyFinished(null, txMgr)).thenCallRealMethod(); 
+    msg.process(dm);
+
+    try {
+      verify(msg, times(1)).operateOnRegion(dm, r, startTime);
+    } catch (RemoteOperationException e) {
+      e.printStackTrace();
+    }
+ 
+  }
+
+  @Test
+  public void messageForUnFinishedTXPerformsOnRegion() {
+    try {
+      when(msg.masqueradeAs(msg, txMgr)).thenReturn(tx);
+    } catch (InterruptedException e1) {
+      e1.printStackTrace();
+    }
+
+    when(msg.hasTxAlreadyFinished(tx, txMgr)).thenCallRealMethod(); 
+    msg.process(dm);  
+
+    try {
+      verify(msg, times(1)).operateOnRegion(dm, r, startTime);
+    } catch (RemoteOperationException e) {
+      e.printStackTrace();
+    }
+  }
+  
+  @Test
+  public void messageForFinishedTXDoesNotPerformOnRegion() {
+    try {
+      when(msg.masqueradeAs(msg, txMgr)).thenReturn(tx);
+    } catch (InterruptedException e1) {
+      e1.printStackTrace();
+    }
+
+    when(msg.hasTXRecentlyCompleted(txid, txMgr)).thenReturn(true);
+    when(msg.hasTxAlreadyFinished(tx, txMgr)).thenCallRealMethod(); 
+    msg.process(dm);
+
+    try {
+      verify(msg, times(0)).operateOnRegion(dm, r, startTime);
+    } catch (RemoteOperationException e) {
+      e.printStackTrace();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a6a805b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
new file mode 100644
index 0000000..d55e045
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+public class TXManagerImplTest {
+  private TXManagerImpl txMgr;
+  TXId txid;
+  DestroyMessage msg;
+  TXCommitMessage txCommitMsg;
+  TXStateProxy globaltx1;
+  TXStateProxy globaltx2;
+
+  @Before
+  public void setUp() {
+    Cache cache = Fakes.cache();
+    txMgr = new TXManagerImpl(null, cache);
+    txid = new TXId(null, 0);
+    msg = mock(DestroyMessage.class);    
+    txCommitMsg = mock(TXCommitMessage.class);
+    
+    when(this.msg.canStartRemoteTransaction()).thenReturn(true);
+    when(this.msg.canParticipateInTransaction()).thenReturn(true);
+  }
+  
+  @Test
+  public void getOrSetHostedTXStateAbleToSetTXStateAndGetLock(){    
+    TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+    
+    assertNotNull(tx);
+    assertEquals(tx, txMgr.getHostedTXState(txid));  
+    assertTrue(txMgr.getLock(tx, txid));
+  }
+
+  @Test
+  public void getLockAfterTXStateRemoved(){
+    TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+    
+    assertEquals(tx, txMgr.getHostedTXState(txid));  
+    assertTrue(txMgr.getLock(tx, txid));
+    assertNotNull(tx);
+    assertTrue(txMgr.getLock(tx, txid));
+    tx.getLock().unlock();
+       
+    TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);    
+    assertEquals(tx, oldtx);
+    
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        txMgr.removeHostedTXState(txid);
+      }
+    });
+    t1.start();
+    
+    try {
+      t1.join();
+    } catch (InterruptedException e) {
+      
+    }
+    
+    TXStateProxy curTx = txMgr.getHostedTXState(txid);
+    assertNull(curTx);
+    
+    //after failover command removed the txid from hostedTXState,
+    //getLock should put back the original TXStateProxy
+    assertTrue(txMgr.getLock(tx, txid));
+    assertEquals(tx, txMgr.getHostedTXState(txid));
+    
+    tx.getLock().unlock();
+  }
+  
+  @Test
+  public void getLockAfterTXStateReplaced(){  
+    TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
+    
+    assertEquals(oldtx, txMgr.getHostedTXState(txid));  
+    assertTrue(txMgr.getLock(oldtx, txid));
+    assertNotNull(oldtx);
+    oldtx.getLock().unlock();
+       
+    TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);    
+    assertEquals(tx, oldtx);
+    
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        txMgr.removeHostedTXState(txid);
+        //replace with new TXState
+        txMgr.getOrSetHostedTXState(txid, msg);
+      }
+    });
+    t1.start();
+    
+    try {
+      t1.join();
+    } catch (InterruptedException e) {
+      
+    }
+    
+    TXStateProxy curTx = txMgr.getHostedTXState(txid);
+    assertNotNull(curTx);
+    //replaced
+    assertNotEquals(tx, curTx);
+    
+    //after TXStateProxy replaced, getLock will not get 
+    assertFalse(txMgr.getLock(tx, txid));
+    
+  }
+  
+  @Test
+  public void getLockAfterTXStateCommitted(){  
+    TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
+    
+    assertEquals(oldtx, txMgr.getHostedTXState(txid));  
+    assertTrue(txMgr.getLock(oldtx, txid));
+    assertNotNull(oldtx);
+    oldtx.getLock().unlock();
+       
+    TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);    
+    assertEquals(tx, oldtx);
+    
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        txMgr.removeHostedTXState(txid);
+        txMgr.saveTXCommitMessageForClientFailover(txid, txCommitMsg);
+      }
+    });
+    t1.start();
+    
+    try {
+      t1.join();
+    } catch (InterruptedException e) {
+      
+    }
+    
+    TXStateProxy curTx = txMgr.getHostedTXState(txid);
+    assertNull(curTx);
+    
+    //after TXStateProxy committed, getLock will get the lock for the oldtx
+    //but caller should not perform ops on this TXStateProxy
+    assertTrue(txMgr.getLock(tx, txid));    
+  } 
+  
+  @Test
+  public void masqueradeAsCanGetLock(){  
+    TXStateProxy tx;
+    try {
+      tx = txMgr.masqueradeAs(msg);
+      assertNotNull(tx);
+    } catch (InterruptedException ie) {
+      
+    }
+  }
+  
+  @Test
+  public void masqueradeAsCanGetLockAfterTXStateIsReplaced(){  
+    TXStateProxy tx;
+    
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        TXStateProxy tx1, tx2;
+
+        tx1 = txMgr.getHostedTXState(txid);
+        assertNull(tx1);
+        tx1 =txMgr.getOrSetHostedTXState(txid, msg);
+        assertNotNull(tx1);
+        assertTrue(txMgr.getLock(tx1, txid));
+        try {
+          Thread.sleep(20);
+        } catch (InterruptedException e) {
+        }
+        txMgr.removeHostedTXState(txid);
+        
+        tx2 =txMgr.getOrSetHostedTXState(txid, msg);
+        assertNotNull(tx2);
+        assertTrue(txMgr.getLock(tx2, txid));
+        
+        tx2.getLock().unlock();
+        tx1.getLock().unlock();
+      }
+    });
+    t1.start();
+    
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      
+    }
+    try {
+      tx = txMgr.masqueradeAs(msg);
+      assertNotNull(tx);
+      tx.getLock().unlock();
+      try {
+        t1.join();
+      } catch (InterruptedException e) {
+        
+      }
+    } catch (InterruptedException ie) {
+      
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a6a805b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
new file mode 100644
index 0000000..bfc3ff6
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.partitioned;
+
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.query.QueryException;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.internal.cache.DataLocationException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.TXId;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+public class PartitionMessageTest {
+
+  private GemFireCacheImpl cache;
+  private PartitionMessage msg;
+  private DistributionManager dm;
+  private PartitionedRegion pr;
+  private TXManagerImpl txMgr;
+  private TXId txid;
+  private long startTime = 1;
+  TXStateProxy tx;
+  
+  @Before
+  public void setUp() {
+    cache = Fakes.cache();
+    dm = mock(DistributionManager.class);  
+    msg = mock(PartitionMessage.class);
+    pr = mock(PartitionedRegion.class);
+    txMgr = new TXManagerImpl(null, cache);
+    tx = mock(TXStateProxyImpl.class);
+    txid = new TXId(null, 0);
+    
+    when(msg.checkCacheClosing(dm)).thenReturn(false);
+    when(msg.checkDSClosing(dm)).thenReturn(false);
+    try {
+      when(msg.getPartitionedRegion()).thenReturn(pr);
+    } catch (PRLocallyDestroyedException e) {
+      e.printStackTrace();
+    }
+    when(msg.getGemFireCacheImpl()).thenReturn(cache);
+    when(msg.getStartPartitionMessageProcessingTime(pr)).thenReturn(startTime);
+    when(msg.getTXManagerImpl(cache)).thenReturn(txMgr);
+ 
+    doAnswer(new CallsRealMethods()).when(msg).process(dm);
+    
+  }
+
+  @Test
+  public void messageWithoutTxPerformsOnRegion() {   
+    try {
+      when(msg.masqueradeAs(msg, txMgr)).thenReturn(null);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    when(msg.hasTxAlreadyFinished(null, txMgr)).thenCallRealMethod(); 
+    msg.process(dm);
+    
+    try {
+      verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
+    } catch (CacheException | QueryException | DataLocationException | InterruptedException | IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void messageForUnFinishedTXPerformsOnRegion() {   
+    try {
+      when(msg.masqueradeAs(msg, txMgr)).thenReturn(tx);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    when(msg.hasTxAlreadyFinished(tx, txMgr)).thenCallRealMethod(); 
+    msg.process(dm);
+    
+    try {
+      verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
+    } catch (CacheException | QueryException | DataLocationException | InterruptedException | IOException e) {
+      e.printStackTrace();
+    }
+  }
+  
+  @Test
+  public void messageForFinishedTXDoesNotPerformOnRegion() {   
+    try {
+      when(msg.masqueradeAs(msg, txMgr)).thenReturn(tx);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    when(msg.hasTXRecentlyCompleted(txid, txMgr)).thenReturn(true);
+    when(msg.hasTxAlreadyFinished(tx, txMgr)).thenCallRealMethod(); 
+    msg.process(dm);
+    
+    try {
+      verify(msg, times(0)).operateOnPartitionedRegion(dm, pr, startTime);
+    } catch (CacheException | QueryException | DataLocationException | InterruptedException | IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+}


[2/2] incubator-geode git commit: Handle late arrival of inflight p2p transaction message.

Posted by es...@apache.org.
Handle late arrival of inflight p2p transaction message.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2d70868b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2d70868b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2d70868b

Branch: refs/heads/feature/GEODE-1400
Commit: 2d70868b56c57ff571b238ef9acc83d6fd319a6c
Parents: 8a6a805
Author: eshu <es...@pivotal.io>
Authored: Tue May 24 12:08:40 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Tue May 24 12:08:40 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/RemoteOperationMessage.java  | 60 +++++++++++++++---
 .../gemfire/internal/cache/TXManagerImpl.java   | 49 ++++++++++++---
 .../cache/partitioned/PartitionMessage.java     | 64 +++++++++++++++++---
 3 files changed, 149 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d70868b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
index 42ce811..95ad647 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
@@ -187,7 +187,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
   /**
    * check to see if the cache is closing
    */
-  final public boolean checkCacheClosing(DistributionManager dm) {
+  public boolean checkCacheClosing(DistributionManager dm) {
     GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
     // return (cache != null && cache.isClosed());
     return cache == null || cache.isClosed();
@@ -197,11 +197,35 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
    * check to see if the distributed system is closing
    * @return true if the distributed system is closing
    */
-  final public boolean checkDSClosing(DistributionManager dm) {
+  public boolean checkDSClosing(DistributionManager dm) {
     InternalDistributedSystem ds = dm.getSystem();
     return (ds == null || ds.isDisconnecting());
   }
   
+  boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr) {
+    if (tx == null) {
+      return false;
+    } else {
+      TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
+      if (hasTXRecentlyCompleted(txid, txMgr)) {
+        //Should only happen when handling a later arrival of transactional op from proxy,
+        //while the transaction has failed over and already committed or rolled back.
+        //Just send back reply as a success op.
+        //The client connection should be lost from proxy, or
+        //the proxy is closed for failover to occur.
+        logger.info("TxId {} has already finished." , txid);
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+
+  boolean hasTXRecentlyCompleted(TXId txid, TXManagerImpl txMgr) {
+    return txMgr.isHostedTxRecentlyCompleted(txid);
+  }
+
   /**
    * Upon receipt of the message, both process the message and send an
    * acknowledgement, not necessarily in that order. Note: Any hang in this
@@ -222,8 +246,8 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
         thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString(dm.getId()));
         return;
       }
-      GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem());
-      r = gfc.getRegionByPathForProcessing(this.regionPath);
+      GemFireCacheImpl gfc = getCache(dm);
+      r = getRegionByPath(gfc);
       if (r == null && failIfRegionMissing()) {
         // if the distributed system is disconnecting, don't send a reply saying
         // the partitioned region can't be found (bug 36585)
@@ -235,11 +259,13 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
       thr = UNHANDLED_EXCEPTION;
       
       // [bruce] r might be null here, so we have to go to the cache instance to get the txmgr
-      TXManagerImpl txMgr = GemFireCacheImpl.getInstance().getTxManager();
+      TXManagerImpl txMgr = getTXManager(gfc);
       TXStateProxy tx = null;
       try {
-        tx = txMgr.masqueradeAs(this);
-        sendReply = operateOnRegion(dm, r, startTime);
+        tx = masqueradeAs(this, txMgr);
+        if (!hasTxAlreadyFinished(tx, txMgr)) {
+          sendReply = operateOnRegion(dm, r, startTime);
+        }
       } finally {
         txMgr.unmasquerade(tx);
       }
@@ -310,6 +336,26 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
       } 
     }
   }
+
+
+  TXStateProxy masqueradeAs(TransactionMessage msg, TXManagerImpl txMgr) throws InterruptedException {
+    return txMgr.masqueradeAs(msg);
+  }
+
+
+  TXManagerImpl getTXManager(GemFireCacheImpl cache) {
+    return cache.getTxManager();
+  }
+
+
+  LocalRegion getRegionByPath(GemFireCacheImpl gfc) {
+    return gfc.getRegionByPathForProcessing(this.regionPath);
+  }
+
+
+  GemFireCacheImpl getCache(final DistributionManager dm) {
+    return (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem());
+  }
   
   /** Send a generic ReplyMessage.  This is in a method so that subclasses can override the reply message type
    * @param pr the Partitioned Region for the message whose statistics are incremented

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d70868b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index 4b2f904..d83647b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@ -729,8 +729,22 @@ public final class TXManagerImpl implements CacheTransactionManager,
       return null;
     }
     TXId key = new TXId(msg.getMemberToMasqueradeAs(), msg.getTXUniqId());
-    TXStateProxy val;
-    val = this.hostedTXStates.get(key);
+    TXStateProxy val = getOrSetHostedTXState(key, msg);
+
+    if (val != null) {
+      boolean success = getLock(val, key);
+      while (!success) {
+        val = getOrSetHostedTXState(key, msg);
+        success = getLock(val, key);
+      }
+    }
+
+    setTXState(val);
+    return val;
+  }
+
+  public TXStateProxy getOrSetHostedTXState(TXId key, TransactionMessage msg) {
+    TXStateProxy val = this.hostedTXStates.get(key);
     if (val == null) {
       synchronized(this.hostedTXStates) {
         val = this.hostedTXStates.get(key);
@@ -746,14 +760,33 @@ public final class TXManagerImpl implements CacheTransactionManager,
         }
       }
     }
-    if (val != null) {
-      if (!val.getLock().isHeldByCurrentThread()) {
-        val.getLock().lock();
+    return val;
+  }
+
+  boolean getLock(TXStateProxy val, TXId key) {
+    if (!val.getLock().isHeldByCurrentThread()) {
+      val.getLock().lock();
+      synchronized (this.hostedTXStates) {
+        TXStateProxy curVal = this.hostedTXStates.get(key);
+        // Inflight op could be received later than TXFailover operation.
+        if (curVal == null) {
+          if (!isHostedTxRecentlyCompleted(key)) {
+            this.hostedTXStates.put(key, val);
+            // Failover op removed the val
+            // It is possible that the same operation can be executed
+            // twice by two threads, but data is consistent.
+          }
+        } else {
+          if (val != curVal) {
+            //Failover op replaced with a new TXStateProxyImpl
+            //Use the new one instead.
+            val.getLock().unlock();
+            return false;
+          }
+        }
       }
     }
-
-    setTXState(val);
-    return val;
+    return true;
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d70868b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
index 626efef..b35133a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
@@ -58,6 +58,7 @@ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegionException;
 import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.TXId;
 import com.gemstone.gemfire.internal.cache.TXManagerImpl;
 import com.gemstone.gemfire.internal.cache.TXStateProxy;
 import com.gemstone.gemfire.internal.cache.TransactionMessage;
@@ -262,8 +263,8 @@ public abstract class PartitionMessage extends DistributionMessage implements
   /**
    * check to see if the cache is closing
    */
-  final public boolean checkCacheClosing(DistributionManager dm) {
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+  public boolean checkCacheClosing(DistributionManager dm) {
+    GemFireCacheImpl cache = getGemFireCacheImpl();
     // return (cache != null && cache.isClosed());
     return cache == null || cache.isClosed();
   }
@@ -272,11 +273,54 @@ public abstract class PartitionMessage extends DistributionMessage implements
    * check to see if the distributed system is closing
    * @return true if the distributed system is closing
    */
-  final public boolean checkDSClosing(DistributionManager dm) {
+  public boolean checkDSClosing(DistributionManager dm) {
     InternalDistributedSystem ds = dm.getSystem();
     return (ds == null || ds.isDisconnecting());
   }
   
+  boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr) {
+    if (tx == null) {
+      return false;
+    } else {
+      TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
+      if (hasTXRecentlyCompleted(txid, txMgr)) {
+        //Should only happen when handling a later arrival of transactional op from proxy,
+        //while the transaction has failed over and already committed or rolled back.
+        //Just send back reply as a success op.
+        //The client connection should be lost from proxy, or
+        //the proxy is closed for failover to occur.
+        logger.info("TxId {} has already finished." , txid);
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+  
+  PartitionedRegion getPartitionedRegion() throws PRLocallyDestroyedException {
+    return PartitionedRegion.getPRFromId(this.regionId);
+  }
+  
+  GemFireCacheImpl getGemFireCacheImpl() {
+    return GemFireCacheImpl.getInstance();
+  }
+
+  TXManagerImpl getTXManagerImpl(GemFireCacheImpl cache) {
+    return cache.getTxManager();
+  }
+  
+  long getStartPartitionMessageProcessingTime(PartitionedRegion pr) {
+    return pr.getPrStats().startPartitionMessageProcessing();
+  }
+  
+  TXStateProxy masqueradeAs(TransactionMessage msg, TXManagerImpl txMgr) throws InterruptedException {
+    return txMgr.masqueradeAs(msg);
+  }
+  
+  boolean hasTXRecentlyCompleted(TXId txid, TXManagerImpl txMgr) {
+    return txMgr.isHostedTxRecentlyCompleted(txid);
+  }
+  
   /**
    * Upon receipt of the message, both process the message and send an
    * acknowledgement, not necessarily in that order. Note: Any hang in this
@@ -298,7 +342,7 @@ public abstract class PartitionMessage extends DistributionMessage implements
         thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString(dm.getId()));
         return;
       }
-      pr = PartitionedRegion.getPRFromId(this.regionId);
+      pr = getPartitionedRegion();
       if (pr == null && failIfRegionMissing()) {
         // if the distributed system is disconnecting, don't send a reply saying
         // the partitioned region can't be found (bug 36585)
@@ -307,19 +351,21 @@ public abstract class PartitionMessage extends DistributionMessage implements
       }
 
       if (pr != null) {
-        startTime = pr.getPrStats().startPartitionMessageProcessing();
+        startTime = getStartPartitionMessageProcessingTime(pr);
       }
       thr = UNHANDLED_EXCEPTION;
       
-      GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+      GemFireCacheImpl cache = getGemFireCacheImpl();
       if(cache==null) {
         throw new ForceReattemptException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString());
       }
-      TXManagerImpl txMgr = cache.getTxManager();
+      TXManagerImpl txMgr = getTXManagerImpl(cache);
       TXStateProxy tx = null;
       try {
-        tx = txMgr.masqueradeAs(this);
-        sendReply = operateOnPartitionedRegion(dm, pr, startTime);
+        tx = masqueradeAs(this, txMgr);
+        if (!hasTxAlreadyFinished(tx, txMgr)) {
+          sendReply = operateOnPartitionedRegion(dm, pr, startTime);
+        }
       } finally {
         txMgr.unmasquerade(tx);
       }