You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2016/05/31 21:44:39 UTC
incubator-geode git commit: GEODE-1400: An inflight transaction op
could arrive later than a client failover operation
Repository: incubator-geode
Updated Branches:
refs/heads/develop 8eac0fa8c -> 384d379ae
GEODE-1400: An inflight transaction op could arrive later than a client failover operation
* Handle inflight p2p transaction message received
later than failover message.
* Add unit tests.
* Move hasTxAlreadyFinished method to TXManagerImpl.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/384d379a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/384d379a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/384d379a
Branch: refs/heads/develop
Commit: 384d379ae7040c8fe222afdeab96973f50157296
Parents: 8eac0fa
Author: eshu <es...@pivotal.io>
Authored: Tue May 24 12:04:52 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Tue May 31 14:39:42 2016 -0700
----------------------------------------------------------------------
.../internal/cache/RemoteOperationMessage.java | 46 +++-
.../gemfire/internal/cache/TXManagerImpl.java | 71 +++++-
.../cache/partitioned/PartitionMessage.java | 54 ++++-
.../cache/RemoteOperationMessageTest.java | 93 ++++++++
.../internal/cache/TXManagerImplTest.java | 236 +++++++++++++++++++
.../cache/partitioned/PartitionMessageTest.java | 100 ++++++++
6 files changed, 566 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
index 8ffab72..19e1dea 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
@@ -187,7 +187,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
/**
* check to see if the cache is closing
*/
- final public boolean checkCacheClosing(DistributionManager dm) {
+ public boolean checkCacheClosing(DistributionManager dm) {
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
// return (cache != null && cache.isClosed());
return cache == null || cache.isClosed();
@@ -197,11 +197,11 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
* check to see if the distributed system is closing
* @return true if the distributed system is closing
*/
- final public boolean checkDSClosing(DistributionManager dm) {
+ public boolean checkDSClosing(DistributionManager dm) {
InternalDistributedSystem ds = dm.getSystem();
return (ds == null || ds.isDisconnecting());
}
-
+
/**
* Upon receipt of the message, both process the message and send an
* acknowledgement, not necessarily in that order. Note: Any hang in this
@@ -222,8 +222,8 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString(dm.getId()));
return;
}
- GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem());
- r = gfc.getRegionByPathForProcessing(this.regionPath);
+ GemFireCacheImpl gfc = getCache(dm);
+ r = getRegionByPath(gfc);
if (r == null && failIfRegionMissing()) {
// if the distributed system is disconnecting, don't send a reply saying
// the partitioned region can't be found (bug 36585)
@@ -235,13 +235,19 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
thr = UNHANDLED_EXCEPTION;
// [bruce] r might be null here, so we have to go to the cache instance to get the txmgr
- TXManagerImpl txMgr = GemFireCacheImpl.getInstance().getTxManager();
- TXStateProxy tx = null;
- try {
- tx = txMgr.masqueradeAs(this);
- sendReply = operateOnRegion(dm, r, startTime);
- } finally {
- txMgr.unmasquerade(tx);
+ TXManagerImpl txMgr = getTXManager(gfc);
+ TXStateProxy tx = txMgr.masqueradeAs(this);
+ if (tx == null) {
+ sendReply = operateOnRegion(dm, r, startTime);
+ } else {
+ try {
+ TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
+ if (!hasTxAlreadyFinished(tx, txMgr, txid)) {
+ sendReply = operateOnRegion(dm, r, startTime);
+ }
+ } finally {
+ txMgr.unmasquerade(tx);
+ }
}
thr = null;
@@ -310,6 +316,22 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
}
}
}
+
+ boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) {
+ return txMgr.hasTxAlreadyFinished(tx, txid);
+ }
+
+ TXManagerImpl getTXManager(GemFireCacheImpl cache) {
+ return cache.getTxManager();
+ }
+
+ LocalRegion getRegionByPath(GemFireCacheImpl gfc) {
+ return gfc.getRegionByPathForProcessing(this.regionPath);
+ }
+
+ GemFireCacheImpl getCache(final DistributionManager dm) {
+ return (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem());
+ }
/** Send a generic ReplyMessage. This is in a method so that subclasses can override the reply message type
* @param pr the Partitioned Region for the message whose statistics are incremented
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index dd4653c..49926e6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@ -84,7 +84,7 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
*
* @see CacheTransactionManager
*/
-public final class TXManagerImpl implements CacheTransactionManager,
+public class TXManagerImpl implements CacheTransactionManager,
MembershipListener {
private static final Logger logger = LogService.getLogger();
@@ -729,8 +729,26 @@ public final class TXManagerImpl implements CacheTransactionManager,
return null;
}
TXId key = new TXId(msg.getMemberToMasqueradeAs(), msg.getTXUniqId());
- TXStateProxy val;
- val = this.hostedTXStates.get(key);
+ TXStateProxy val = getOrSetHostedTXState(key, msg);
+
+ if (val != null) {
+ boolean success = getLock(val, key);
+ while (!success) {
+ val = getOrSetHostedTXState(key, msg);
+ if (val != null) {
+ success = getLock(val, key);
+ } else {
+ break;
+ }
+ }
+ }
+
+ setTXState(val);
+ return val;
+ }
+
+ TXStateProxy getOrSetHostedTXState(TXId key, TransactionMessage msg) {
+ TXStateProxy val = this.hostedTXStates.get(key);
if (val == null) {
synchronized(this.hostedTXStates) {
val = this.hostedTXStates.get(key);
@@ -746,14 +764,49 @@ public final class TXManagerImpl implements CacheTransactionManager,
}
}
}
- if (val != null) {
- if (!val.getLock().isHeldByCurrentThread()) {
- val.getLock().lock();
+ return val;
+ }
+
+ boolean getLock(TXStateProxy val, TXId key) {
+ if (!val.getLock().isHeldByCurrentThread()) {
+ val.getLock().lock();
+ synchronized (this.hostedTXStates) {
+ TXStateProxy curVal = this.hostedTXStates.get(key);
+ // Inflight op could be received later than TXFailover operation.
+ if (curVal == null) {
+ if (!isHostedTxRecentlyCompleted(key)) {
+ this.hostedTXStates.put(key, val);
+ // Failover op removed the val
+ // It is possible that the same operation can be executed
+ // twice by two threads, but data is consistent.
+ }
+ } else {
+ if (val != curVal) {
+ //Failover op replaced with a new TXStateProxyImpl
+ //Use the new one instead.
+ val.getLock().unlock();
+ return false;
+ }
+ }
}
}
-
- setTXState(val);
- return val;
+ return true;
+ }
+
+ public boolean hasTxAlreadyFinished(TXStateProxy tx, TXId txid) {
+ if (tx == null) {
+ return false;
+ }
+ if (isHostedTxRecentlyCompleted(txid)) {
+ //Should only happen when handling a later arrival of transactional op from proxy,
+ //while the transaction has failed over and already committed or rolled back.
+ //Just send back reply as a success op.
+ //The client connection should be lost from proxy, or
+ //the proxy is closed for failover to occur.
+ logger.info("TxId {} has already finished." , txid);
+ return true;
+ }
+ return false;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
index db4cc59..9c54587 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
@@ -58,6 +58,7 @@ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionException;
import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.TXId;
import com.gemstone.gemfire.internal.cache.TXManagerImpl;
import com.gemstone.gemfire.internal.cache.TXStateProxy;
import com.gemstone.gemfire.internal.cache.TransactionMessage;
@@ -262,8 +263,8 @@ public abstract class PartitionMessage extends DistributionMessage implements
/**
* check to see if the cache is closing
*/
- final public boolean checkCacheClosing(DistributionManager dm) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ public boolean checkCacheClosing(DistributionManager dm) {
+ GemFireCacheImpl cache = getGemFireCacheImpl();
// return (cache != null && cache.isClosed());
return cache == null || cache.isClosed();
}
@@ -272,11 +273,32 @@ public abstract class PartitionMessage extends DistributionMessage implements
* check to see if the distributed system is closing
* @return true if the distributed system is closing
*/
- final public boolean checkDSClosing(DistributionManager dm) {
+ public boolean checkDSClosing(DistributionManager dm) {
InternalDistributedSystem ds = dm.getSystem();
return (ds == null || ds.isDisconnecting());
}
+ boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) {
+ return txMgr.hasTxAlreadyFinished(tx, txid);
+ }
+
+ PartitionedRegion getPartitionedRegion() throws PRLocallyDestroyedException {
+ return PartitionedRegion.getPRFromId(this.regionId);
+ }
+
+ GemFireCacheImpl getGemFireCacheImpl() {
+ return GemFireCacheImpl.getInstance();
+ }
+
+ TXManagerImpl getTXManagerImpl(GemFireCacheImpl cache) {
+ return cache.getTxManager();
+ }
+
+ long getStartPartitionMessageProcessingTime(PartitionedRegion pr) {
+ return pr.getPrStats().startPartitionMessageProcessing();
+ }
+
+
/**
* Upon receipt of the message, both process the message and send an
* acknowledgement, not necessarily in that order. Note: Any hang in this
@@ -298,7 +320,7 @@ public abstract class PartitionMessage extends DistributionMessage implements
thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString(dm.getId()));
return;
}
- pr = PartitionedRegion.getPRFromId(this.regionId);
+ pr = getPartitionedRegion();
if (pr == null && failIfRegionMissing()) {
// if the distributed system is disconnecting, don't send a reply saying
// the partitioned region can't be found (bug 36585)
@@ -307,21 +329,27 @@ public abstract class PartitionMessage extends DistributionMessage implements
}
if (pr != null) {
- startTime = pr.getPrStats().startPartitionMessageProcessing();
+ startTime = getStartPartitionMessageProcessingTime(pr);
}
thr = UNHANDLED_EXCEPTION;
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ GemFireCacheImpl cache = getGemFireCacheImpl();
if(cache==null) {
throw new ForceReattemptException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString());
}
- TXManagerImpl txMgr = cache.getTxManager();
- TXStateProxy tx = null;
- try {
- tx = txMgr.masqueradeAs(this);
- sendReply = operateOnPartitionedRegion(dm, pr, startTime);
- } finally {
- txMgr.unmasquerade(tx);
+ TXManagerImpl txMgr = getTXManagerImpl(cache);
+ TXStateProxy tx = txMgr.masqueradeAs(this);
+ if (tx == null) {
+ sendReply = operateOnPartitionedRegion(dm, pr, startTime);
+ } else {
+ try {
+ TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
+ if (!hasTxAlreadyFinished(tx, txMgr, txid)) {
+ sendReply = operateOnPartitionedRegion(dm, pr, startTime);
+ }
+ } finally {
+ txMgr.unmasquerade(tx);
+ }
}
thr = null;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
new file mode 100644
index 0000000..ecfc2b0
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static org.mockito.Mockito.*;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
+
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.TXId;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+public class RemoteOperationMessageTest {
+ private GemFireCacheImpl cache;
+ private RemoteOperationMessage msg;
+ private DistributionManager dm;
+ private LocalRegion r;
+ private TXManagerImpl txMgr;
+ private TXId txid;
+ private long startTime = 0;
+ TXStateProxy tx;
+
+ @Before
+ public void setUp() throws InterruptedException {
+ cache = Fakes.cache();
+ dm = mock(DistributionManager.class);
+ msg = mock(RemoteOperationMessage.class);
+ r = mock(LocalRegion.class);
+ txMgr = mock(TXManagerImpl.class);
+ txid = new TXId(null, 0);
+ tx = mock(TXStateProxyImpl.class);
+
+ when(msg.checkCacheClosing(dm)).thenReturn(false);
+ when(msg.checkDSClosing(dm)).thenReturn(false);
+ when(msg.getCache(dm)).thenReturn(cache);
+ when(msg.getRegionByPath(cache)).thenReturn(r);
+ when(msg.getTXManager(cache)).thenReturn(txMgr);
+ when(txMgr.hasTxAlreadyFinished(tx, txid)).thenCallRealMethod();
+
+ doAnswer(new CallsRealMethods()).when(msg).process(dm);
+ }
+
+ @Test
+ public void messageWithNoTXPerformsOnRegion() throws InterruptedException, RemoteOperationException {
+ when(txMgr.masqueradeAs(msg)).thenReturn(null);
+ msg.process(dm);
+
+ verify(msg, times(1)).operateOnRegion(dm, r, startTime);
+ }
+
+ @Test
+ public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, RemoteOperationException {
+ when(txMgr.masqueradeAs(msg)).thenReturn(tx);
+ when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenCallRealMethod();
+ msg.process(dm);
+
+ verify(msg, times(1)).operateOnRegion(dm, r, startTime);
+ }
+
+ @Test
+ public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, RemoteOperationException {
+ when(txMgr.masqueradeAs(msg)).thenReturn(tx);
+ when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true);
+ msg.process(dm);
+
+ verify(msg, times(0)).operateOnRegion(dm, r, startTime);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
new file mode 100644
index 0000000..a4b8127
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+import com.jayway.awaitility.Awaitility;
+
+
+@Category(UnitTest.class)
+public class TXManagerImplTest {
+ private TXManagerImpl txMgr;
+ TXId txid;
+ DestroyMessage msg;
+ TXCommitMessage txCommitMsg;
+ TXId completedTxid;
+ TXId notCompletedTxid;
+ InternalDistributedMember member;
+ CountDownLatch latch = new CountDownLatch(1);
+ TXStateProxy tx1, tx2;
+
+ @Before
+ public void setUp() {
+ Cache cache = Fakes.cache();
+ txMgr = new TXManagerImpl(null, cache);
+ txid = new TXId(null, 0);
+ msg = mock(DestroyMessage.class);
+ txCommitMsg = mock(TXCommitMessage.class);
+ member = mock(InternalDistributedMember.class);
+ completedTxid = new TXId(member, 1);
+ notCompletedTxid = new TXId(member, 2);
+
+ when(this.msg.canStartRemoteTransaction()).thenReturn(true);
+ when(this.msg.canParticipateInTransaction()).thenReturn(true);
+ }
+
+ @Test
+ public void getOrSetHostedTXStateAbleToSetTXStateAndGetLock(){
+ TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+
+ assertNotNull(tx);
+ assertEquals(tx, txMgr.getHostedTXState(txid));
+ assertTrue(txMgr.getLock(tx, txid));
+ }
+
+ @Test
+ public void getLockAfterTXStateRemoved() throws InterruptedException{
+ TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+
+ assertEquals(tx, txMgr.getHostedTXState(txid));
+ assertTrue(txMgr.getLock(tx, txid));
+ assertNotNull(tx);
+ assertTrue(txMgr.getLock(tx, txid));
+ tx.getLock().unlock();
+
+ TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
+ assertEquals(tx, oldtx);
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ txMgr.removeHostedTXState(txid);
+ }
+ });
+ t1.start();
+
+ t1.join();
+
+ TXStateProxy curTx = txMgr.getHostedTXState(txid);
+ assertNull(curTx);
+
+ //after failover command removed the txid from hostedTXState,
+ //getLock should put back the original TXStateProxy
+ assertTrue(txMgr.getLock(tx, txid));
+ assertEquals(tx, txMgr.getHostedTXState(txid));
+
+ tx.getLock().unlock();
+ }
+
+ @Test
+ public void getLockAfterTXStateReplaced() throws InterruptedException{
+ TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
+
+ assertEquals(oldtx, txMgr.getHostedTXState(txid));
+ assertTrue(txMgr.getLock(oldtx, txid));
+ assertNotNull(oldtx);
+ oldtx.getLock().unlock();
+
+ TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+ assertEquals(tx, oldtx);
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ txMgr.removeHostedTXState(txid);
+ //replace with new TXState
+ txMgr.getOrSetHostedTXState(txid, msg);
+ }
+ });
+ t1.start();
+
+ t1.join();
+
+ TXStateProxy curTx = txMgr.getHostedTXState(txid);
+ assertNotNull(curTx);
+ //replaced
+ assertNotEquals(tx, curTx);
+
+ //after TXStateProxy replaced, getLock will not get
+ assertFalse(txMgr.getLock(tx, txid));
+
+ }
+
+ @Test
+ public void getLockAfterTXStateCommitted() throws InterruptedException{
+ TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
+
+ assertEquals(oldtx, txMgr.getHostedTXState(txid));
+ assertTrue(txMgr.getLock(oldtx, txid));
+ assertNotNull(oldtx);
+ oldtx.getLock().unlock();
+
+ TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+ assertEquals(tx, oldtx);
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ txMgr.removeHostedTXState(txid);
+ txMgr.saveTXCommitMessageForClientFailover(txid, txCommitMsg);
+ }
+ });
+ t1.start();
+
+ t1.join();
+
+ TXStateProxy curTx = txMgr.getHostedTXState(txid);
+ assertNull(curTx);
+
+ //after TXStateProxy committed, getLock will get the lock for the oldtx
+ //but caller should not perform ops on this TXStateProxy
+ assertTrue(txMgr.getLock(tx, txid));
+ }
+
+ @Test
+ public void masqueradeAsCanGetLock() throws InterruptedException{
+ TXStateProxy tx;
+
+ tx = txMgr.masqueradeAs(msg);
+ assertNotNull(tx);
+ }
+
+ @Test
+ public void masqueradeAsCanGetLockAfterTXStateIsReplaced() throws InterruptedException{
+ TXStateProxy tx;
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ tx1 = txMgr.getHostedTXState(txid);
+ assertNull(tx1);
+ tx1 =txMgr.getOrSetHostedTXState(txid, msg);
+ assertNotNull(tx1);
+ assertTrue(txMgr.getLock(tx1, txid));
+
+ latch.countDown();
+
+ Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
+ .atMost(30, TimeUnit.SECONDS).until(() -> tx1.getLock().hasQueuedThreads());
+
+ txMgr.removeHostedTXState(txid);
+
+ tx2 =txMgr.getOrSetHostedTXState(txid, msg);
+ assertNotNull(tx2);
+ assertTrue(txMgr.getLock(tx2, txid));
+
+ tx2.getLock().unlock();
+ tx1.getLock().unlock();
+ }
+ });
+ t1.start();
+
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
+
+ tx = txMgr.masqueradeAs(msg);
+ assertNotNull(tx);
+ assertEquals(tx, tx2);
+ tx.getLock().unlock();
+
+ t1.join();
+
+ }
+
+ @Test
+ public void hasTxAlreadyFinishedDetectsNoTx() {
+ assertFalse(txMgr.hasTxAlreadyFinished(null, txid));
+ }
+
+ @Test
+ public void hasTxAlreadyFinishedDetectsTxNotFinished() {
+ TXStateProxy tx = txMgr.getOrSetHostedTXState(notCompletedTxid, msg);
+ assertFalse(txMgr.hasTxAlreadyFinished(tx, notCompletedTxid));
+ }
+
+ @Test
+ public void hasTxAlreadyFinishedDetectsTxFinished() throws InterruptedException {
+ TXStateProxy tx = txMgr.getOrSetHostedTXState(completedTxid, msg);
+ txMgr.saveTXCommitMessageForClientFailover(completedTxid, txCommitMsg);
+ assertTrue(txMgr.hasTxAlreadyFinished(tx, completedTxid));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
new file mode 100644
index 0000000..bbbf714
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.partitioned;
+
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.query.QueryException;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.internal.cache.DataLocationException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.TXId;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+public class PartitionMessageTest {
+
+ private GemFireCacheImpl cache;
+ private PartitionMessage msg;
+ private DistributionManager dm;
+ private PartitionedRegion pr;
+ private TXManagerImpl txMgr;
+ private TXId txid;
+ private long startTime = 1;
+ TXStateProxy tx;
+
+ @Before
+ public void setUp() throws PRLocallyDestroyedException, InterruptedException {
+ cache = Fakes.cache();
+ dm = mock(DistributionManager.class);
+ msg = mock(PartitionMessage.class);
+ pr = mock(PartitionedRegion.class);
+ txMgr = mock(TXManagerImpl.class);
+ tx = mock(TXStateProxyImpl.class);
+ txid = new TXId(null, 0);
+
+ when(msg.checkCacheClosing(dm)).thenReturn(false);
+ when(msg.checkDSClosing(dm)).thenReturn(false);
+ when(msg.getPartitionedRegion()).thenReturn(pr);
+ when(msg.getGemFireCacheImpl()).thenReturn(cache);
+ when(msg.getStartPartitionMessageProcessingTime(pr)).thenReturn(startTime);
+ when(msg.getTXManagerImpl(cache)).thenReturn(txMgr);
+ when(msg.hasTxAlreadyFinished(null, txMgr, txid)).thenCallRealMethod();
+
+ doAnswer(new CallsRealMethods()).when(msg).process(dm);
+ }
+
+ @Test
+ public void messageWithNoTXPerformsOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException {
+ when(txMgr.masqueradeAs(msg)).thenReturn(null);
+ msg.process(dm);
+
+ verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
+ }
+
+ @Test
+ public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException {
+ when(txMgr.masqueradeAs(msg)).thenReturn(tx);
+ msg.process(dm);
+
+ verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
+ }
+
+ @Test
+ public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException {
+ when(txMgr.masqueradeAs(msg)).thenReturn(tx);
+ when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true);
+ msg.process(dm);
+
+ verify(msg, times(0)).operateOnPartitionedRegion(dm, pr, startTime);
+ }
+
+}