You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2016/06/13 23:49:24 UTC

incubator-geode git commit: GEODE-1517: fix TXStateProxyImpl continue working after TXManagerImpl is closed.

Repository: incubator-geode
Updated Branches:
  refs/heads/develop d07c966bb -> 6f70cd703


GEODE-1517: fix TXStateProxyImpl continue working after TXManagerImpl is closed.

Holding the ReentrantLock during TXManagerImpl.close() when closing TXStateProxyImpl. Provent any TXStateProxyImpl to work further. Add a test case where it fails without the above fix and passes after the fix.

Also avoid going through synchronized failover map to check if a TXStateProxyImpl is finished, instead, use its isInProgress() call.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6f70cd70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6f70cd70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6f70cd70

Branch: refs/heads/develop
Commit: 6f70cd703139417388fbd9845dd8aefb3b3f7c30
Parents: d07c966
Author: eshu <es...@pivotal.io>
Authored: Mon Jun 13 16:44:55 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Mon Jun 13 16:44:55 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/RemoteOperationMessage.java  | 10 ++-
 .../gemfire/internal/cache/TXManagerImpl.java   | 32 ++++-----
 .../cache/partitioned/PartitionMessage.java     | 10 ++-
 .../cache/RemoteOperationMessageTest.java       | 31 +++++++--
 .../internal/cache/TXManagerImplTest.java       | 73 +++++++++++++++-----
 .../cache/partitioned/PartitionMessageTest.java | 27 ++++++--
 6 files changed, 120 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
index 19e1dea..db5bcca 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
@@ -241,8 +241,10 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
         sendReply = operateOnRegion(dm, r, startTime);        
       } else {
         try {
-          TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
-          if (!hasTxAlreadyFinished(tx, txMgr, txid)) {
+          if (txMgr.isClosed()) {
+            // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
+            sendReply = false;
+          } else if (tx.isInProgress()) {
             sendReply = operateOnRegion(dm, r, startTime);       
           }  
         } finally {
@@ -317,10 +319,6 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
     }
   }
 
-  boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) {
-    return txMgr.hasTxAlreadyFinished(tx, txid);
-  }
-
   TXManagerImpl getTXManager(GemFireCacheImpl cache) {
     return cache.getTxManager();
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index 2608878..1ea7f71 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@ -90,7 +90,7 @@ public class TXManagerImpl implements CacheTransactionManager,
   
   private final ArrayList<TransactionListener> txListeners = new ArrayList<TransactionListener>(8);
   public TransactionWriter writer = null;
-  private boolean closed = false;
+  private volatile boolean closed = false;
 
   private final Map<TXId, TXStateProxy> hostedTXStates;
 
@@ -577,7 +577,12 @@ public class TXManagerImpl implements CacheTransactionManager,
     }
     this.closed = true;
     for (TXStateProxy proxy: this.hostedTXStates.values()) {
-      proxy.close();
+      proxy.getLock().lock();
+      try {
+        proxy.close();
+      } finally {
+        proxy.getLock().unlock();
+      }
     }
     for (TXStateProxy proxy: this.localTxMap.values()) {
       proxy.close();
@@ -646,7 +651,7 @@ public class TXManagerImpl implements CacheTransactionManager,
     }
   }
 
-  private final boolean isClosed() {
+  public final boolean isClosed() {
     return this.closed;
   }
   private final void checkClosed() {
@@ -752,10 +757,13 @@ public class TXManagerImpl implements CacheTransactionManager,
         // Inflight op could be received later than TXFailover operation.
         if (curVal == null) {
           if (!isHostedTxRecentlyCompleted(key)) {
-            this.hostedTXStates.put(key, val);
             // Failover op removed the val
             // It is possible that the same operation can be executed
             // twice by two threads, but data is consistent.
+            this.hostedTXStates.put(key, val);
+          } else {
+            //Another thread should complete the transaction
+            logger.info("{} has already finished." , val.getTxId());
           }
         } else {
           if (val != curVal) {
@@ -770,22 +778,6 @@ public class TXManagerImpl implements CacheTransactionManager,
     return true;
   }
   
-  public boolean hasTxAlreadyFinished(TXStateProxy tx, TXId txid) {
-    if (tx == null) {
-      return false;
-    }
-    if (isHostedTxRecentlyCompleted(txid)) {
-      //Should only happen when handling a later arrival of transactional op from proxy,
-      //while the transaction has failed over and already committed or rolled back.
-      //Just send back reply as a success op.
-      //The client connection should be lost from proxy, or
-      //the proxy is closed for failover to occur.
-      logger.info("TxId {} has already finished." , txid);
-      return true;
-    }
-    return false;
-  }
-  
   /**
    * Associate the remote txState with the thread processing this message. Also,
    * we acquire a lock on the txState, on which this thread operates.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
index 1b83ee3..14fce08 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
@@ -278,10 +278,6 @@ public abstract class PartitionMessage extends DistributionMessage implements
     return (ds == null || ds.isDisconnecting());
   }
   
-  boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) {
-    return txMgr.hasTxAlreadyFinished(tx, txid);
-  }
-  
   PartitionedRegion getPartitionedRegion() throws PRLocallyDestroyedException {
     return PartitionedRegion.getPRFromId(this.regionId);
   }
@@ -343,8 +339,10 @@ public abstract class PartitionMessage extends DistributionMessage implements
         sendReply = operateOnPartitionedRegion(dm, pr, startTime);        
       } else {
         try {
-          TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
-          if (!hasTxAlreadyFinished(tx, txMgr, txid)) {
+          if (txMgr.isClosed()) {
+            // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
+            sendReply = false;
+          } else if (tx.isInProgress()) {
             sendReply = operateOnPartitionedRegion(dm, pr, startTime);        
           }  
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
index ecfc2b0..18ce2a9 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
@@ -25,7 +25,6 @@ import org.mockito.internal.stubbing.answers.CallsRealMethods;
 
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.TXId;
 import com.gemstone.gemfire.internal.cache.TXManagerImpl;
 import com.gemstone.gemfire.internal.cache.TXStateProxy;
 import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
@@ -40,7 +39,6 @@ public class RemoteOperationMessageTest {
   private DistributionManager dm;
   private LocalRegion r;
   private TXManagerImpl txMgr;
-  private TXId txid;
   private long startTime = 0;
   TXStateProxy tx;
   
@@ -51,7 +49,6 @@ public class RemoteOperationMessageTest {
     msg = mock(RemoteOperationMessage.class);
     r = mock(LocalRegion.class);
     txMgr = mock(TXManagerImpl.class);
-    txid = new TXId(null, 0);
     tx = mock(TXStateProxyImpl.class);
     
     when(msg.checkCacheClosing(dm)).thenReturn(false);
@@ -59,9 +56,8 @@ public class RemoteOperationMessageTest {
     when(msg.getCache(dm)).thenReturn(cache);
     when(msg.getRegionByPath(cache)).thenReturn(r);
     when(msg.getTXManager(cache)).thenReturn(txMgr);
-    when(txMgr.hasTxAlreadyFinished(tx, txid)).thenCallRealMethod();
 
-    doAnswer(new CallsRealMethods()).when(msg).process(dm);    
+    doAnswer(new CallsRealMethods()).when(msg).process(dm);
   }
   
   @Test
@@ -75,7 +71,7 @@ public class RemoteOperationMessageTest {
   @Test
   public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, RemoteOperationException {
     when(txMgr.masqueradeAs(msg)).thenReturn(tx);
-    when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenCallRealMethod(); 
+    when(tx.isInProgress()).thenReturn(true);
     msg.process(dm);
 
     verify(msg, times(1)).operateOnRegion(dm, r, startTime);
@@ -84,10 +80,31 @@ public class RemoteOperationMessageTest {
   @Test
   public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, RemoteOperationException {
     when(txMgr.masqueradeAs(msg)).thenReturn(tx);
-    when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true); 
+    when(tx.isInProgress()).thenReturn(false); 
     msg.process(dm);
 
     verify(msg, times(0)).operateOnRegion(dm, r, startTime);
   }
 
+  @Test
+  public void noNewTxProcessingAfterTXManagerImplClosed() throws RemoteOperationException {
+    txMgr = new TXManagerImpl(null, cache);
+    
+    when(msg.checkCacheClosing(dm)).thenReturn(false);
+    when(msg.checkDSClosing(dm)).thenReturn(false);
+    when(msg.getCache(dm)).thenReturn(cache);
+    when(msg.getRegionByPath(cache)).thenReturn(r);
+    when(msg.getTXManager(cache)).thenReturn(txMgr);
+
+    when(msg.canParticipateInTransaction()).thenReturn(true);
+    when(msg.canStartRemoteTransaction()).thenReturn(true);
+    
+    msg.process(dm);
+    
+    txMgr.close();
+    
+    msg.process(dm);
+
+    verify(msg, times(1)).operateOnRegion(dm, r, startTime);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
index ce24947..ae4f378 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
@@ -49,6 +49,7 @@ public class TXManagerImplTest {
   TXStateProxy tx1, tx2;
   DistributionManager dm;
   TXRemoteRollbackMessage rollbackMsg;
+  TXRemoteCommitMessage commitMsg;
 
   @Before
   public void setUp() {
@@ -63,9 +64,11 @@ public class TXManagerImplTest {
     notCompletedTxid = new TXId(member, 2);
     latch = new CountDownLatch(1);
     rollbackMsg = new TXRemoteRollbackMessage();
+    commitMsg = new TXRemoteCommitMessage();
     
     when(this.msg.canStartRemoteTransaction()).thenReturn(true);
     when(this.msg.canParticipateInTransaction()).thenReturn(true);
+
   }
 
   @Test
@@ -142,9 +145,9 @@ public class TXManagerImplTest {
     assertFalse(txMgr.getLock(tx, txid));
     
   }
-  
+
   @Test
-  public void getLockAfterTXStateCommitted() throws InterruptedException{  
+  public void getLockAfterTXStateCommitted() throws InterruptedException{
     TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
     
     assertEquals(oldtx, txMgr.getHostedTXState(txid));  
@@ -157,6 +160,20 @@ public class TXManagerImplTest {
     
     Thread t1 = new Thread(new Runnable() {
       public void run() {
+        when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
+        TXStateProxy tx;
+        try {
+          tx = txMgr.masqueradeAs(commitMsg);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+        tx.setCommitOnBehalfOfRemoteStub(true);
+        try {
+          tx.commit();
+        } finally {
+          txMgr.unmasquerade(tx);
+        }
         txMgr.removeHostedTXState(txid);
         txMgr.saveTXCommitMessageForClientFailover(txid, txCommitMsg);
       }
@@ -168,11 +185,12 @@ public class TXManagerImplTest {
     TXStateProxy curTx = txMgr.getHostedTXState(txid);
     assertNull(curTx);
     
+    assertFalse(tx.isInProgress());
     //after TXStateProxy committed, getLock will get the lock for the oldtx
     //but caller should not perform ops on this TXStateProxy
     assertTrue(txMgr.getLock(tx, txid));    
   } 
-  
+
   @Test
   public void masqueradeAsCanGetLock() throws InterruptedException{  
     TXStateProxy tx;
@@ -222,21 +240,43 @@ public class TXManagerImplTest {
   }
   
   @Test
-  public void hasTxAlreadyFinishedDetectsNoTx() {   
-    assertFalse(txMgr.hasTxAlreadyFinished(null, txid));
+  public void testTxStateWithNotFinishedTx() {
+    TXStateProxy tx = txMgr.getOrSetHostedTXState(notCompletedTxid, msg);
+    assertTrue(tx.isInProgress());
   }
-  
+
   @Test
-  public void hasTxAlreadyFinishedDetectsTxNotFinished() {
-    TXStateProxy tx = txMgr.getOrSetHostedTXState(notCompletedTxid, msg);
-    assertFalse(txMgr.hasTxAlreadyFinished(tx, notCompletedTxid));
+  public void testTxStateWithCommittedTx() throws InterruptedException {
+    when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
+    setupTx(); 
+    
+    TXStateProxy tx = txMgr.masqueradeAs(commitMsg);
+    try {
+      tx.commit();
+    } finally {
+      txMgr.unmasquerade(tx);
+    }
+    assertFalse(tx.isInProgress());
   }
   
   @Test
-  public void hasTxAlreadyFinishedDetectsTxFinished() throws InterruptedException {
-    TXStateProxy tx = txMgr.getOrSetHostedTXState(completedTxid, msg);    
-    txMgr.saveTXCommitMessageForClientFailover(completedTxid, txCommitMsg); 
-    assertTrue(txMgr.hasTxAlreadyFinished(tx, completedTxid));
+  public void testTxStateWithRolledBackTx() throws InterruptedException {
+    when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
+    setupTx();
+    
+    TXStateProxy tx = txMgr.masqueradeAs(rollbackMsg);
+    try {
+      tx.rollback();
+    } finally {
+      txMgr.unmasquerade(tx);
+    }
+    assertFalse(tx.isInProgress());
+  }
+
+  private void setupTx() throws InterruptedException {
+    TXStateProxy tx = txMgr.masqueradeAs(msg);
+    tx.setCommitOnBehalfOfRemoteStub(true);
+    txMgr.unmasquerade(tx);
   }
 
   @Test
@@ -251,11 +291,8 @@ public class TXManagerImplTest {
           e.printStackTrace();
           throw new RuntimeException(e);
         }
-        try {
-          msg.process(dm);
-        } finally {
-          txMgr.unmasquerade(tx1);
-        }
+
+        msg.process(dm);
 
         TXStateProxy existingTx = masqueradeToRollback();
         latch.countDown();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
index bbbf714..e05e130 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
@@ -31,7 +31,6 @@ import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.internal.cache.DataLocationException;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.TXId;
 import com.gemstone.gemfire.internal.cache.TXManagerImpl;
 import com.gemstone.gemfire.internal.cache.TXStateProxy;
 import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
@@ -47,7 +46,6 @@ public class PartitionMessageTest {
   private DistributionManager dm;
   private PartitionedRegion pr;
   private TXManagerImpl txMgr;
-  private TXId txid;
   private long startTime = 1;
   TXStateProxy tx;
   
@@ -59,7 +57,6 @@ public class PartitionMessageTest {
     pr = mock(PartitionedRegion.class);
     txMgr = mock(TXManagerImpl.class);
     tx = mock(TXStateProxyImpl.class);
-    txid = new TXId(null, 0);
     
     when(msg.checkCacheClosing(dm)).thenReturn(false);
     when(msg.checkDSClosing(dm)).thenReturn(false);
@@ -67,9 +64,8 @@ public class PartitionMessageTest {
     when(msg.getGemFireCacheImpl()).thenReturn(cache);
     when(msg.getStartPartitionMessageProcessingTime(pr)).thenReturn(startTime);
     when(msg.getTXManagerImpl(cache)).thenReturn(txMgr);
-    when(msg.hasTxAlreadyFinished(null, txMgr, txid)).thenCallRealMethod();
     
-    doAnswer(new CallsRealMethods()).when(msg).process(dm);    
+    doAnswer(new CallsRealMethods()).when(msg).process(dm);     
   }
 
   @Test
@@ -83,6 +79,7 @@ public class PartitionMessageTest {
   @Test
   public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException {   
     when(txMgr.masqueradeAs(msg)).thenReturn(tx);
+    when(tx.isInProgress()).thenReturn(true);
     msg.process(dm);
     
     verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
@@ -91,10 +88,28 @@ public class PartitionMessageTest {
   @Test
   public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException {   
     when(txMgr.masqueradeAs(msg)).thenReturn(tx);
-    when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true);
+    when(tx.isInProgress()).thenReturn(false);
     msg.process(dm);
   
     verify(msg, times(0)).operateOnPartitionedRegion(dm, pr, startTime);
   }
 
+  @Test
+  public void noNewTxProcessingAfterTXManagerImplClosed() throws CacheException, QueryException, DataLocationException, InterruptedException, IOException {
+    txMgr = new TXManagerImpl(null, cache);
+    when(msg.getPartitionedRegion()).thenReturn(pr);
+    when(msg.getGemFireCacheImpl()).thenReturn(cache);
+    when(msg.getStartPartitionMessageProcessingTime(pr)).thenReturn(startTime);
+    when(msg.getTXManagerImpl(cache)).thenReturn(txMgr);
+    when(msg.canParticipateInTransaction()).thenReturn(true);
+    when(msg.canStartRemoteTransaction()).thenReturn(true);
+    
+    msg.process(dm);
+    
+    txMgr.close();
+    
+    msg.process(dm);
+
+    verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
+  }
 }