You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by sa...@apache.org on 2016/06/14 19:11:34 UTC
[84/90] [abbrv] incubator-geode git commit: GEODE-1517: fix
TXStateProxyImpl continue working after TXManagerImpl is closed.
GEODE-1517: fix TXStateProxyImpl continue working after TXManagerImpl is closed.
Holding the ReentrantLock during TXManagerImpl.close() when closing TXStateProxyImpl. Provent any TXStateProxyImpl to work further. Add a test case where it fails without the above fix and passes after the fix.
Also avoid going through synchronized failover map to check if a TXStateProxyImpl is finished, instead, use its isInProgress() call.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6f70cd70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6f70cd70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6f70cd70
Branch: refs/heads/feature/GEODE-93
Commit: 6f70cd703139417388fbd9845dd8aefb3b3f7c30
Parents: d07c966
Author: eshu <es...@pivotal.io>
Authored: Mon Jun 13 16:44:55 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Mon Jun 13 16:44:55 2016 -0700
----------------------------------------------------------------------
.../internal/cache/RemoteOperationMessage.java | 10 ++-
.../gemfire/internal/cache/TXManagerImpl.java | 32 ++++-----
.../cache/partitioned/PartitionMessage.java | 10 ++-
.../cache/RemoteOperationMessageTest.java | 31 +++++++--
.../internal/cache/TXManagerImplTest.java | 73 +++++++++++++++-----
.../cache/partitioned/PartitionMessageTest.java | 27 ++++++--
6 files changed, 120 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
index 19e1dea..db5bcca 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
@@ -241,8 +241,10 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
sendReply = operateOnRegion(dm, r, startTime);
} else {
try {
- TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
- if (!hasTxAlreadyFinished(tx, txMgr, txid)) {
+ if (txMgr.isClosed()) {
+ // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
+ sendReply = false;
+ } else if (tx.isInProgress()) {
sendReply = operateOnRegion(dm, r, startTime);
}
} finally {
@@ -317,10 +319,6 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
}
}
- boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) {
- return txMgr.hasTxAlreadyFinished(tx, txid);
- }
-
TXManagerImpl getTXManager(GemFireCacheImpl cache) {
return cache.getTxManager();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index 2608878..1ea7f71 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@ -90,7 +90,7 @@ public class TXManagerImpl implements CacheTransactionManager,
private final ArrayList<TransactionListener> txListeners = new ArrayList<TransactionListener>(8);
public TransactionWriter writer = null;
- private boolean closed = false;
+ private volatile boolean closed = false;
private final Map<TXId, TXStateProxy> hostedTXStates;
@@ -577,7 +577,12 @@ public class TXManagerImpl implements CacheTransactionManager,
}
this.closed = true;
for (TXStateProxy proxy: this.hostedTXStates.values()) {
- proxy.close();
+ proxy.getLock().lock();
+ try {
+ proxy.close();
+ } finally {
+ proxy.getLock().unlock();
+ }
}
for (TXStateProxy proxy: this.localTxMap.values()) {
proxy.close();
@@ -646,7 +651,7 @@ public class TXManagerImpl implements CacheTransactionManager,
}
}
- private final boolean isClosed() {
+ public final boolean isClosed() {
return this.closed;
}
private final void checkClosed() {
@@ -752,10 +757,13 @@ public class TXManagerImpl implements CacheTransactionManager,
// Inflight op could be received later than TXFailover operation.
if (curVal == null) {
if (!isHostedTxRecentlyCompleted(key)) {
- this.hostedTXStates.put(key, val);
// Failover op removed the val
// It is possible that the same operation can be executed
// twice by two threads, but data is consistent.
+ this.hostedTXStates.put(key, val);
+ } else {
+ //Another thread should complete the transaction
+ logger.info("{} has already finished." , val.getTxId());
}
} else {
if (val != curVal) {
@@ -770,22 +778,6 @@ public class TXManagerImpl implements CacheTransactionManager,
return true;
}
- public boolean hasTxAlreadyFinished(TXStateProxy tx, TXId txid) {
- if (tx == null) {
- return false;
- }
- if (isHostedTxRecentlyCompleted(txid)) {
- //Should only happen when handling a later arrival of transactional op from proxy,
- //while the transaction has failed over and already committed or rolled back.
- //Just send back reply as a success op.
- //The client connection should be lost from proxy, or
- //the proxy is closed for failover to occur.
- logger.info("TxId {} has already finished." , txid);
- return true;
- }
- return false;
- }
-
/**
* Associate the remote txState with the thread processing this message. Also,
* we acquire a lock on the txState, on which this thread operates.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
index 1b83ee3..14fce08 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
@@ -278,10 +278,6 @@ public abstract class PartitionMessage extends DistributionMessage implements
return (ds == null || ds.isDisconnecting());
}
- boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) {
- return txMgr.hasTxAlreadyFinished(tx, txid);
- }
-
PartitionedRegion getPartitionedRegion() throws PRLocallyDestroyedException {
return PartitionedRegion.getPRFromId(this.regionId);
}
@@ -343,8 +339,10 @@ public abstract class PartitionMessage extends DistributionMessage implements
sendReply = operateOnPartitionedRegion(dm, pr, startTime);
} else {
try {
- TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
- if (!hasTxAlreadyFinished(tx, txMgr, txid)) {
+ if (txMgr.isClosed()) {
+ // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
+ sendReply = false;
+ } else if (tx.isInProgress()) {
sendReply = operateOnPartitionedRegion(dm, pr, startTime);
}
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
index ecfc2b0..18ce2a9 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
@@ -25,7 +25,6 @@ import org.mockito.internal.stubbing.answers.CallsRealMethods;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.TXId;
import com.gemstone.gemfire.internal.cache.TXManagerImpl;
import com.gemstone.gemfire.internal.cache.TXStateProxy;
import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
@@ -40,7 +39,6 @@ public class RemoteOperationMessageTest {
private DistributionManager dm;
private LocalRegion r;
private TXManagerImpl txMgr;
- private TXId txid;
private long startTime = 0;
TXStateProxy tx;
@@ -51,7 +49,6 @@ public class RemoteOperationMessageTest {
msg = mock(RemoteOperationMessage.class);
r = mock(LocalRegion.class);
txMgr = mock(TXManagerImpl.class);
- txid = new TXId(null, 0);
tx = mock(TXStateProxyImpl.class);
when(msg.checkCacheClosing(dm)).thenReturn(false);
@@ -59,9 +56,8 @@ public class RemoteOperationMessageTest {
when(msg.getCache(dm)).thenReturn(cache);
when(msg.getRegionByPath(cache)).thenReturn(r);
when(msg.getTXManager(cache)).thenReturn(txMgr);
- when(txMgr.hasTxAlreadyFinished(tx, txid)).thenCallRealMethod();
- doAnswer(new CallsRealMethods()).when(msg).process(dm);
+ doAnswer(new CallsRealMethods()).when(msg).process(dm);
}
@Test
@@ -75,7 +71,7 @@ public class RemoteOperationMessageTest {
@Test
public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, RemoteOperationException {
when(txMgr.masqueradeAs(msg)).thenReturn(tx);
- when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenCallRealMethod();
+ when(tx.isInProgress()).thenReturn(true);
msg.process(dm);
verify(msg, times(1)).operateOnRegion(dm, r, startTime);
@@ -84,10 +80,31 @@ public class RemoteOperationMessageTest {
@Test
public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, RemoteOperationException {
when(txMgr.masqueradeAs(msg)).thenReturn(tx);
- when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true);
+ when(tx.isInProgress()).thenReturn(false);
msg.process(dm);
verify(msg, times(0)).operateOnRegion(dm, r, startTime);
}
+ @Test
+ public void noNewTxProcessingAfterTXManagerImplClosed() throws RemoteOperationException {
+ txMgr = new TXManagerImpl(null, cache);
+
+ when(msg.checkCacheClosing(dm)).thenReturn(false);
+ when(msg.checkDSClosing(dm)).thenReturn(false);
+ when(msg.getCache(dm)).thenReturn(cache);
+ when(msg.getRegionByPath(cache)).thenReturn(r);
+ when(msg.getTXManager(cache)).thenReturn(txMgr);
+
+ when(msg.canParticipateInTransaction()).thenReturn(true);
+ when(msg.canStartRemoteTransaction()).thenReturn(true);
+
+ msg.process(dm);
+
+ txMgr.close();
+
+ msg.process(dm);
+
+ verify(msg, times(1)).operateOnRegion(dm, r, startTime);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
index ce24947..ae4f378 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
@@ -49,6 +49,7 @@ public class TXManagerImplTest {
TXStateProxy tx1, tx2;
DistributionManager dm;
TXRemoteRollbackMessage rollbackMsg;
+ TXRemoteCommitMessage commitMsg;
@Before
public void setUp() {
@@ -63,9 +64,11 @@ public class TXManagerImplTest {
notCompletedTxid = new TXId(member, 2);
latch = new CountDownLatch(1);
rollbackMsg = new TXRemoteRollbackMessage();
+ commitMsg = new TXRemoteCommitMessage();
when(this.msg.canStartRemoteTransaction()).thenReturn(true);
when(this.msg.canParticipateInTransaction()).thenReturn(true);
+
}
@Test
@@ -142,9 +145,9 @@ public class TXManagerImplTest {
assertFalse(txMgr.getLock(tx, txid));
}
-
+
@Test
- public void getLockAfterTXStateCommitted() throws InterruptedException{
+ public void getLockAfterTXStateCommitted() throws InterruptedException{
TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
assertEquals(oldtx, txMgr.getHostedTXState(txid));
@@ -157,6 +160,20 @@ public class TXManagerImplTest {
Thread t1 = new Thread(new Runnable() {
public void run() {
+ when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
+ TXStateProxy tx;
+ try {
+ tx = txMgr.masqueradeAs(commitMsg);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ tx.setCommitOnBehalfOfRemoteStub(true);
+ try {
+ tx.commit();
+ } finally {
+ txMgr.unmasquerade(tx);
+ }
txMgr.removeHostedTXState(txid);
txMgr.saveTXCommitMessageForClientFailover(txid, txCommitMsg);
}
@@ -168,11 +185,12 @@ public class TXManagerImplTest {
TXStateProxy curTx = txMgr.getHostedTXState(txid);
assertNull(curTx);
+ assertFalse(tx.isInProgress());
//after TXStateProxy committed, getLock will get the lock for the oldtx
//but caller should not perform ops on this TXStateProxy
assertTrue(txMgr.getLock(tx, txid));
}
-
+
@Test
public void masqueradeAsCanGetLock() throws InterruptedException{
TXStateProxy tx;
@@ -222,21 +240,43 @@ public class TXManagerImplTest {
}
@Test
- public void hasTxAlreadyFinishedDetectsNoTx() {
- assertFalse(txMgr.hasTxAlreadyFinished(null, txid));
+ public void testTxStateWithNotFinishedTx() {
+ TXStateProxy tx = txMgr.getOrSetHostedTXState(notCompletedTxid, msg);
+ assertTrue(tx.isInProgress());
}
-
+
@Test
- public void hasTxAlreadyFinishedDetectsTxNotFinished() {
- TXStateProxy tx = txMgr.getOrSetHostedTXState(notCompletedTxid, msg);
- assertFalse(txMgr.hasTxAlreadyFinished(tx, notCompletedTxid));
+ public void testTxStateWithCommittedTx() throws InterruptedException {
+ when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
+ setupTx();
+
+ TXStateProxy tx = txMgr.masqueradeAs(commitMsg);
+ try {
+ tx.commit();
+ } finally {
+ txMgr.unmasquerade(tx);
+ }
+ assertFalse(tx.isInProgress());
}
@Test
- public void hasTxAlreadyFinishedDetectsTxFinished() throws InterruptedException {
- TXStateProxy tx = txMgr.getOrSetHostedTXState(completedTxid, msg);
- txMgr.saveTXCommitMessageForClientFailover(completedTxid, txCommitMsg);
- assertTrue(txMgr.hasTxAlreadyFinished(tx, completedTxid));
+ public void testTxStateWithRolledBackTx() throws InterruptedException {
+ when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
+ setupTx();
+
+ TXStateProxy tx = txMgr.masqueradeAs(rollbackMsg);
+ try {
+ tx.rollback();
+ } finally {
+ txMgr.unmasquerade(tx);
+ }
+ assertFalse(tx.isInProgress());
+ }
+
+ private void setupTx() throws InterruptedException {
+ TXStateProxy tx = txMgr.masqueradeAs(msg);
+ tx.setCommitOnBehalfOfRemoteStub(true);
+ txMgr.unmasquerade(tx);
}
@Test
@@ -251,11 +291,8 @@ public class TXManagerImplTest {
e.printStackTrace();
throw new RuntimeException(e);
}
- try {
- msg.process(dm);
- } finally {
- txMgr.unmasquerade(tx1);
- }
+
+ msg.process(dm);
TXStateProxy existingTx = masqueradeToRollback();
latch.countDown();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
index bbbf714..e05e130 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
@@ -31,7 +31,6 @@ import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.internal.cache.DataLocationException;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.TXId;
import com.gemstone.gemfire.internal.cache.TXManagerImpl;
import com.gemstone.gemfire.internal.cache.TXStateProxy;
import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
@@ -47,7 +46,6 @@ public class PartitionMessageTest {
private DistributionManager dm;
private PartitionedRegion pr;
private TXManagerImpl txMgr;
- private TXId txid;
private long startTime = 1;
TXStateProxy tx;
@@ -59,7 +57,6 @@ public class PartitionMessageTest {
pr = mock(PartitionedRegion.class);
txMgr = mock(TXManagerImpl.class);
tx = mock(TXStateProxyImpl.class);
- txid = new TXId(null, 0);
when(msg.checkCacheClosing(dm)).thenReturn(false);
when(msg.checkDSClosing(dm)).thenReturn(false);
@@ -67,9 +64,8 @@ public class PartitionMessageTest {
when(msg.getGemFireCacheImpl()).thenReturn(cache);
when(msg.getStartPartitionMessageProcessingTime(pr)).thenReturn(startTime);
when(msg.getTXManagerImpl(cache)).thenReturn(txMgr);
- when(msg.hasTxAlreadyFinished(null, txMgr, txid)).thenCallRealMethod();
- doAnswer(new CallsRealMethods()).when(msg).process(dm);
+ doAnswer(new CallsRealMethods()).when(msg).process(dm);
}
@Test
@@ -83,6 +79,7 @@ public class PartitionMessageTest {
@Test
public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException {
when(txMgr.masqueradeAs(msg)).thenReturn(tx);
+ when(tx.isInProgress()).thenReturn(true);
msg.process(dm);
verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
@@ -91,10 +88,28 @@ public class PartitionMessageTest {
@Test
public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException {
when(txMgr.masqueradeAs(msg)).thenReturn(tx);
- when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true);
+ when(tx.isInProgress()).thenReturn(false);
msg.process(dm);
verify(msg, times(0)).operateOnPartitionedRegion(dm, pr, startTime);
}
+ @Test
+ public void noNewTxProcessingAfterTXManagerImplClosed() throws CacheException, QueryException, DataLocationException, InterruptedException, IOException {
+ txMgr = new TXManagerImpl(null, cache);
+ when(msg.getPartitionedRegion()).thenReturn(pr);
+ when(msg.getGemFireCacheImpl()).thenReturn(cache);
+ when(msg.getStartPartitionMessageProcessingTime(pr)).thenReturn(startTime);
+ when(msg.getTXManagerImpl(cache)).thenReturn(txMgr);
+ when(msg.canParticipateInTransaction()).thenReturn(true);
+ when(msg.canStartRemoteTransaction()).thenReturn(true);
+
+ msg.process(dm);
+
+ txMgr.close();
+
+ msg.process(dm);
+
+ verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
+ }
}