You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/06/07 21:53:06 UTC
[07/17] incubator-geode git commit: GEODE-1491: Make sure when
checking if a transaction is completed from isHostedTxRecentlyCompleted()
method, the rolled back transaction is considered as well.
GEODE-1491: Make sure when checking if a transaction is completed from isHostedTxRecentlyCompleted() method, the rolled back transaction is considered as well.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0815e1b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0815e1b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0815e1b6
Branch: refs/heads/feature/GEODE-837
Commit: 0815e1b6e9f11881b03780ac730856ee5db36c9c
Parents: 001a4e1
Author: eshu <es...@pivotal.io>
Authored: Mon Jun 6 12:38:25 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Mon Jun 6 12:38:25 2016 -0700
----------------------------------------------------------------------
.../gemfire/internal/cache/TXManagerImpl.java | 20 +++---
.../cache/partitioned/PartitionMessage.java | 2 +-
.../tier/sockets/command/RollbackCommand.java | 5 ++
.../internal/cache/TXManagerImplTest.java | 70 ++++++++++++++++++--
4 files changed, 83 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0815e1b6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index df0176d..2608878 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@ -1021,16 +1021,18 @@ public class TXManagerImpl implements CacheTransactionManager,
}
public boolean isHostedTxRecentlyCompleted(TXId txId) {
- // if someone is asking to see if we have the txId, they will come
- // back and ask for the commit message, this could take a long time
- // specially when called from TXFailoverCommand, so we move
- // the txId to the front of the queue
- TXCommitMessage msg = failoverMap.remove(txId);
- if (msg != null) {
- failoverMap.put(txId, msg);
- return true;
+ synchronized(failoverMap) {
+ if (failoverMap.containsKey(txId)) {
+ // if someone is asking to see if we have the txId, they will come
+ // back and ask for the commit message, this could take a long time
+ // specially when called from TXFailoverCommand, so we move
+ // the txId back to the linked map by removing and putting it back.
+ TXCommitMessage msg = failoverMap.remove(txId);
+ failoverMap.put(txId, msg);
+ return true;
+ }
+ return false;
}
- return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0815e1b6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
index 9c54587..c2ab27e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
@@ -186,7 +186,7 @@ public abstract class PartitionMessage extends DistributionMessage implements
* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TransactionMessage#getTXOriginatorClient()
*/
- public final InternalDistributedMember getTXOriginatorClient() {
+ public InternalDistributedMember getTXOriginatorClient() {
return txMemberId;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0815e1b6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RollbackCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RollbackCommand.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RollbackCommand.java
index ed7c706..f0316bd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RollbackCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RollbackCommand.java
@@ -63,6 +63,11 @@ public class RollbackCommand extends BaseCommand {
txId = txState.getTxId();
txMgr.rollback();
sendRollbackReply(msg, servConn);
+ } else {
+ //could not find TxState in the host server.
+ //Protect against a failover command received so late,
+ //and it is removed from the failoverMap due to capacity.
+ sendRollbackReply(msg, servConn);
}
} catch (Exception e) {
writeException(msg, e, false, servConn);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0815e1b6/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
index a4b8127..ce24947 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage;
import com.gemstone.gemfire.test.fake.Fakes;
@@ -44,24 +45,29 @@ public class TXManagerImplTest {
TXId completedTxid;
TXId notCompletedTxid;
InternalDistributedMember member;
- CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch;
TXStateProxy tx1, tx2;
+ DistributionManager dm;
+ TXRemoteRollbackMessage rollbackMsg;
@Before
public void setUp() {
Cache cache = Fakes.cache();
- txMgr = new TXManagerImpl(null, cache);
+ dm = mock(DistributionManager.class);
+ txMgr = new TXManagerImpl(mock(CachePerfStats.class), cache);
txid = new TXId(null, 0);
msg = mock(DestroyMessage.class);
txCommitMsg = mock(TXCommitMessage.class);
member = mock(InternalDistributedMember.class);
completedTxid = new TXId(member, 1);
notCompletedTxid = new TXId(member, 2);
+ latch = new CountDownLatch(1);
+ rollbackMsg = new TXRemoteRollbackMessage();
when(this.msg.canStartRemoteTransaction()).thenReturn(true);
when(this.msg.canParticipateInTransaction()).thenReturn(true);
}
-
+
@Test
public void getOrSetHostedTXStateAbleToSetTXStateAndGetLock(){
TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
@@ -232,5 +238,61 @@ public class TXManagerImplTest {
txMgr.saveTXCommitMessageForClientFailover(completedTxid, txCommitMsg);
assertTrue(txMgr.hasTxAlreadyFinished(tx, completedTxid));
}
-
+
+ @Test
+ public void txRolledbackShouldCompleteTx() throws InterruptedException {
+ when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ try {
+ tx1 = txMgr.masqueradeAs(msg);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ try {
+ msg.process(dm);
+ } finally {
+ txMgr.unmasquerade(tx1);
+ }
+
+ TXStateProxy existingTx = masqueradeToRollback();
+ latch.countDown();
+ Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
+ .atMost(30, TimeUnit.SECONDS).until(() -> tx1.getLock().hasQueuedThreads());
+
+ rollbackTransaction(existingTx);
+ }
+ });
+ t1.start();
+
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
+
+ TXStateProxy tx = txMgr.masqueradeAs(rollbackMsg);
+ assertEquals(tx, tx1);
+ t1.join();
+ rollbackTransaction(tx);
+ }
+
+ private TXStateProxy masqueradeToRollback() {
+ TXStateProxy existingTx;
+ try {
+ existingTx = txMgr.masqueradeAs(rollbackMsg);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ return existingTx;
+ }
+
+ private void rollbackTransaction(TXStateProxy existingTx) {
+ try {
+ if (!txMgr.isHostedTxRecentlyCompleted(txid)) {
+ txMgr.rollback();
+ }
+ } finally {
+ txMgr.unmasquerade(existingTx);
+ }
+ }
}