You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2016/05/24 19:12:34 UTC
[1/2] incubator-geode git commit: Handle inflight p2p transaction
message received later than failover message. Add unit test cases.
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-1400 [created] 2d70868b5
Handle inflight p2p transaction message received later than failover message.
Add unit test cases.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8a6a805b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8a6a805b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8a6a805b
Branch: refs/heads/feature/GEODE-1400
Commit: 8a6a805b423095c87f63a75e32498a2964aba08b
Parents: 2234535
Author: eshu <es...@pivotal.io>
Authored: Tue May 24 12:04:52 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Tue May 24 12:04:52 2016 -0700
----------------------------------------------------------------------
.../cache/RemoteOperationMessageTest.java | 121 ++++++++++
.../internal/cache/TXManagerImplTest.java | 230 +++++++++++++++++++
.../cache/partitioned/PartitionMessageTest.java | 131 +++++++++++
3 files changed, 482 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a6a805b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
new file mode 100644
index 0000000..f965a45
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static org.mockito.Mockito.*;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
+
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.TXId;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+public class RemoteOperationMessageTest {
+ private GemFireCacheImpl cache;
+ private RemoteOperationMessage msg;
+ private DistributionManager dm;
+ private LocalRegion r;
+ private TXManagerImpl txMgr;
+ private TXId txid;
+ private long startTime = 0;
+ TXStateProxy tx;
+
+ @Before
+ public void setUp() {
+ cache = Fakes.cache();
+ dm = mock(DistributionManager.class);
+ msg = mock(RemoteOperationMessage.class);
+ r = mock(LocalRegion.class);
+ txMgr = new TXManagerImpl(null, cache);
+ txid = new TXId(null, 0);
+ tx = mock(TXStateProxyImpl.class);
+
+ when(msg.checkCacheClosing(dm)).thenReturn(false);
+ when(msg.checkDSClosing(dm)).thenReturn(false);
+ when(msg.getCache(dm)).thenReturn(cache);
+ when(msg.getRegionByPath(cache)).thenReturn(r);
+ when(msg.getTXManager(cache)).thenReturn(txMgr);
+
+ doAnswer(new CallsRealMethods()).when(msg).process(dm);
+ }
+
+ @Test
+ public void messageWithoutTxPerformsOnRegion() {
+ try {
+ when(msg.masqueradeAs(msg, txMgr)).thenReturn(null);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ when(msg.hasTxAlreadyFinished(null, txMgr)).thenCallRealMethod();
+ msg.process(dm);
+
+ try {
+ verify(msg, times(1)).operateOnRegion(dm, r, startTime);
+ } catch (RemoteOperationException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ @Test
+ public void messageForUnFinishedTXPerformsOnRegion() {
+ try {
+ when(msg.masqueradeAs(msg, txMgr)).thenReturn(tx);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+
+ when(msg.hasTxAlreadyFinished(tx, txMgr)).thenCallRealMethod();
+ msg.process(dm);
+
+ try {
+ verify(msg, times(1)).operateOnRegion(dm, r, startTime);
+ } catch (RemoteOperationException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void messageForFinishedTXDoesNotPerformOnRegion() {
+ try {
+ when(msg.masqueradeAs(msg, txMgr)).thenReturn(tx);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+
+ when(msg.hasTXRecentlyCompleted(txid, txMgr)).thenReturn(true);
+ when(msg.hasTxAlreadyFinished(tx, txMgr)).thenCallRealMethod();
+ msg.process(dm);
+
+ try {
+ verify(msg, times(0)).operateOnRegion(dm, r, startTime);
+ } catch (RemoteOperationException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a6a805b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
new file mode 100644
index 0000000..d55e045
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+public class TXManagerImplTest {
+ private TXManagerImpl txMgr;
+ TXId txid;
+ DestroyMessage msg;
+ TXCommitMessage txCommitMsg;
+ TXStateProxy globaltx1;
+ TXStateProxy globaltx2;
+
+ @Before
+ public void setUp() {
+ Cache cache = Fakes.cache();
+ txMgr = new TXManagerImpl(null, cache);
+ txid = new TXId(null, 0);
+ msg = mock(DestroyMessage.class);
+ txCommitMsg = mock(TXCommitMessage.class);
+
+ when(this.msg.canStartRemoteTransaction()).thenReturn(true);
+ when(this.msg.canParticipateInTransaction()).thenReturn(true);
+ }
+
+ @Test
+ public void getOrSetHostedTXStateAbleToSetTXStateAndGetLock(){
+ TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+
+ assertNotNull(tx);
+ assertEquals(tx, txMgr.getHostedTXState(txid));
+ assertTrue(txMgr.getLock(tx, txid));
+ }
+
+ @Test
+ public void getLockAfterTXStateRemoved(){
+ TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+
+ assertEquals(tx, txMgr.getHostedTXState(txid));
+ assertTrue(txMgr.getLock(tx, txid));
+ assertNotNull(tx);
+ assertTrue(txMgr.getLock(tx, txid));
+ tx.getLock().unlock();
+
+ TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
+ assertEquals(tx, oldtx);
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ txMgr.removeHostedTXState(txid);
+ }
+ });
+ t1.start();
+
+ try {
+ t1.join();
+ } catch (InterruptedException e) {
+
+ }
+
+ TXStateProxy curTx = txMgr.getHostedTXState(txid);
+ assertNull(curTx);
+
+ //after failover command removed the txid from hostedTXState,
+ //getLock should put back the original TXStateProxy
+ assertTrue(txMgr.getLock(tx, txid));
+ assertEquals(tx, txMgr.getHostedTXState(txid));
+
+ tx.getLock().unlock();
+ }
+
+ @Test
+ public void getLockAfterTXStateReplaced(){
+ TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
+
+ assertEquals(oldtx, txMgr.getHostedTXState(txid));
+ assertTrue(txMgr.getLock(oldtx, txid));
+ assertNotNull(oldtx);
+ oldtx.getLock().unlock();
+
+ TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+ assertEquals(tx, oldtx);
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ txMgr.removeHostedTXState(txid);
+ //replace with new TXState
+ txMgr.getOrSetHostedTXState(txid, msg);
+ }
+ });
+ t1.start();
+
+ try {
+ t1.join();
+ } catch (InterruptedException e) {
+
+ }
+
+ TXStateProxy curTx = txMgr.getHostedTXState(txid);
+ assertNotNull(curTx);
+ //replaced
+ assertNotEquals(tx, curTx);
+
+ //after TXStateProxy replaced, getLock will not get
+ assertFalse(txMgr.getLock(tx, txid));
+
+ }
+
+ @Test
+ public void getLockAfterTXStateCommitted(){
+ TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
+
+ assertEquals(oldtx, txMgr.getHostedTXState(txid));
+ assertTrue(txMgr.getLock(oldtx, txid));
+ assertNotNull(oldtx);
+ oldtx.getLock().unlock();
+
+ TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
+ assertEquals(tx, oldtx);
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ txMgr.removeHostedTXState(txid);
+ txMgr.saveTXCommitMessageForClientFailover(txid, txCommitMsg);
+ }
+ });
+ t1.start();
+
+ try {
+ t1.join();
+ } catch (InterruptedException e) {
+
+ }
+
+ TXStateProxy curTx = txMgr.getHostedTXState(txid);
+ assertNull(curTx);
+
+ //after TXStateProxy committed, getLock will get the lock for the oldtx
+ //but caller should not perform ops on this TXStateProxy
+ assertTrue(txMgr.getLock(tx, txid));
+ }
+
+ @Test
+ public void masqueradeAsCanGetLock(){
+ TXStateProxy tx;
+ try {
+ tx = txMgr.masqueradeAs(msg);
+ assertNotNull(tx);
+ } catch (InterruptedException ie) {
+
+ }
+ }
+
+ @Test
+ public void masqueradeAsCanGetLockAfterTXStateIsReplaced(){
+ TXStateProxy tx;
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ TXStateProxy tx1, tx2;
+
+ tx1 = txMgr.getHostedTXState(txid);
+ assertNull(tx1);
+ tx1 =txMgr.getOrSetHostedTXState(txid, msg);
+ assertNotNull(tx1);
+ assertTrue(txMgr.getLock(tx1, txid));
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException e) {
+ }
+ txMgr.removeHostedTXState(txid);
+
+ tx2 =txMgr.getOrSetHostedTXState(txid, msg);
+ assertNotNull(tx2);
+ assertTrue(txMgr.getLock(tx2, txid));
+
+ tx2.getLock().unlock();
+ tx1.getLock().unlock();
+ }
+ });
+ t1.start();
+
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+
+ }
+ try {
+ tx = txMgr.masqueradeAs(msg);
+ assertNotNull(tx);
+ tx.getLock().unlock();
+ try {
+ t1.join();
+ } catch (InterruptedException e) {
+
+ }
+ } catch (InterruptedException ie) {
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a6a805b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
new file mode 100644
index 0000000..bfc3ff6
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.partitioned;
+
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.query.QueryException;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.internal.cache.DataLocationException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.TXId;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+public class PartitionMessageTest {
+
+ private GemFireCacheImpl cache;
+ private PartitionMessage msg;
+ private DistributionManager dm;
+ private PartitionedRegion pr;
+ private TXManagerImpl txMgr;
+ private TXId txid;
+ private long startTime = 1;
+ TXStateProxy tx;
+
+ @Before
+ public void setUp() {
+ cache = Fakes.cache();
+ dm = mock(DistributionManager.class);
+ msg = mock(PartitionMessage.class);
+ pr = mock(PartitionedRegion.class);
+ txMgr = new TXManagerImpl(null, cache);
+ tx = mock(TXStateProxyImpl.class);
+ txid = new TXId(null, 0);
+
+ when(msg.checkCacheClosing(dm)).thenReturn(false);
+ when(msg.checkDSClosing(dm)).thenReturn(false);
+ try {
+ when(msg.getPartitionedRegion()).thenReturn(pr);
+ } catch (PRLocallyDestroyedException e) {
+ e.printStackTrace();
+ }
+ when(msg.getGemFireCacheImpl()).thenReturn(cache);
+ when(msg.getStartPartitionMessageProcessingTime(pr)).thenReturn(startTime);
+ when(msg.getTXManagerImpl(cache)).thenReturn(txMgr);
+
+ doAnswer(new CallsRealMethods()).when(msg).process(dm);
+
+ }
+
+ @Test
+ public void messageWithoutTxPerformsOnRegion() {
+ try {
+ when(msg.masqueradeAs(msg, txMgr)).thenReturn(null);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ when(msg.hasTxAlreadyFinished(null, txMgr)).thenCallRealMethod();
+ msg.process(dm);
+
+ try {
+ verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
+ } catch (CacheException | QueryException | DataLocationException | InterruptedException | IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void messageForUnFinishedTXPerformsOnRegion() {
+ try {
+ when(msg.masqueradeAs(msg, txMgr)).thenReturn(tx);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ when(msg.hasTxAlreadyFinished(tx, txMgr)).thenCallRealMethod();
+ msg.process(dm);
+
+ try {
+ verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime);
+ } catch (CacheException | QueryException | DataLocationException | InterruptedException | IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void messageForFinishedTXDoesNotPerformOnRegion() {
+ try {
+ when(msg.masqueradeAs(msg, txMgr)).thenReturn(tx);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ when(msg.hasTXRecentlyCompleted(txid, txMgr)).thenReturn(true);
+ when(msg.hasTxAlreadyFinished(tx, txMgr)).thenCallRealMethod();
+ msg.process(dm);
+
+ try {
+ verify(msg, times(0)).operateOnPartitionedRegion(dm, pr, startTime);
+ } catch (CacheException | QueryException | DataLocationException | InterruptedException | IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
[2/2] incubator-geode git commit: Handle late arrival of inflight p2p
transaction message.
Posted by es...@apache.org.
Handle late arrival of inflight p2p transaction message.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2d70868b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2d70868b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2d70868b
Branch: refs/heads/feature/GEODE-1400
Commit: 2d70868b56c57ff571b238ef9acc83d6fd319a6c
Parents: 8a6a805
Author: eshu <es...@pivotal.io>
Authored: Tue May 24 12:08:40 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Tue May 24 12:08:40 2016 -0700
----------------------------------------------------------------------
.../internal/cache/RemoteOperationMessage.java | 60 +++++++++++++++---
.../gemfire/internal/cache/TXManagerImpl.java | 49 ++++++++++++---
.../cache/partitioned/PartitionMessage.java | 64 +++++++++++++++++---
3 files changed, 149 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d70868b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
index 42ce811..95ad647 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
@@ -187,7 +187,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
/**
* check to see if the cache is closing
*/
- final public boolean checkCacheClosing(DistributionManager dm) {
+ public boolean checkCacheClosing(DistributionManager dm) {
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
// return (cache != null && cache.isClosed());
return cache == null || cache.isClosed();
@@ -197,11 +197,35 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
* check to see if the distributed system is closing
* @return true if the distributed system is closing
*/
- final public boolean checkDSClosing(DistributionManager dm) {
+ public boolean checkDSClosing(DistributionManager dm) {
InternalDistributedSystem ds = dm.getSystem();
return (ds == null || ds.isDisconnecting());
}
+ boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr) {
+ if (tx == null) {
+ return false;
+ } else {
+ TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
+ if (hasTXRecentlyCompleted(txid, txMgr)) {
+ //Should only happen when handling a later arrival of transactional op from proxy,
+ //while the transaction has failed over and already committed or rolled back.
+ //Just send back reply as a success op.
+ //The client connection should be lost from proxy, or
+ //the proxy is closed for failover to occur.
+ logger.info("TxId {} has already finished." , txid);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+
+ boolean hasTXRecentlyCompleted(TXId txid, TXManagerImpl txMgr) {
+ return txMgr.isHostedTxRecentlyCompleted(txid);
+ }
+
/**
* Upon receipt of the message, both process the message and send an
* acknowledgement, not necessarily in that order. Note: Any hang in this
@@ -222,8 +246,8 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString(dm.getId()));
return;
}
- GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem());
- r = gfc.getRegionByPathForProcessing(this.regionPath);
+ GemFireCacheImpl gfc = getCache(dm);
+ r = getRegionByPath(gfc);
if (r == null && failIfRegionMissing()) {
// if the distributed system is disconnecting, don't send a reply saying
// the partitioned region can't be found (bug 36585)
@@ -235,11 +259,13 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
thr = UNHANDLED_EXCEPTION;
// [bruce] r might be null here, so we have to go to the cache instance to get the txmgr
- TXManagerImpl txMgr = GemFireCacheImpl.getInstance().getTxManager();
+ TXManagerImpl txMgr = getTXManager(gfc);
TXStateProxy tx = null;
try {
- tx = txMgr.masqueradeAs(this);
- sendReply = operateOnRegion(dm, r, startTime);
+ tx = masqueradeAs(this, txMgr);
+ if (!hasTxAlreadyFinished(tx, txMgr)) {
+ sendReply = operateOnRegion(dm, r, startTime);
+ }
} finally {
txMgr.unmasquerade(tx);
}
@@ -310,6 +336,26 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
}
}
}
+
+
+ TXStateProxy masqueradeAs(TransactionMessage msg, TXManagerImpl txMgr) throws InterruptedException {
+ return txMgr.masqueradeAs(msg);
+ }
+
+
+ TXManagerImpl getTXManager(GemFireCacheImpl cache) {
+ return cache.getTxManager();
+ }
+
+
+ LocalRegion getRegionByPath(GemFireCacheImpl gfc) {
+ return gfc.getRegionByPathForProcessing(this.regionPath);
+ }
+
+
+ GemFireCacheImpl getCache(final DistributionManager dm) {
+ return (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem());
+ }
/** Send a generic ReplyMessage. This is in a method so that subclasses can override the reply message type
* @param pr the Partitioned Region for the message whose statistics are incremented
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d70868b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index 4b2f904..d83647b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@ -729,8 +729,22 @@ public final class TXManagerImpl implements CacheTransactionManager,
return null;
}
TXId key = new TXId(msg.getMemberToMasqueradeAs(), msg.getTXUniqId());
- TXStateProxy val;
- val = this.hostedTXStates.get(key);
+ TXStateProxy val = getOrSetHostedTXState(key, msg);
+
+ if (val != null) {
+ boolean success = getLock(val, key);
+ while (!success) {
+ val = getOrSetHostedTXState(key, msg);
+ success = getLock(val, key);
+ }
+ }
+
+ setTXState(val);
+ return val;
+ }
+
+ public TXStateProxy getOrSetHostedTXState(TXId key, TransactionMessage msg) {
+ TXStateProxy val = this.hostedTXStates.get(key);
if (val == null) {
synchronized(this.hostedTXStates) {
val = this.hostedTXStates.get(key);
@@ -746,14 +760,33 @@ public final class TXManagerImpl implements CacheTransactionManager,
}
}
}
- if (val != null) {
- if (!val.getLock().isHeldByCurrentThread()) {
- val.getLock().lock();
+ return val;
+ }
+
+ boolean getLock(TXStateProxy val, TXId key) {
+ if (!val.getLock().isHeldByCurrentThread()) {
+ val.getLock().lock();
+ synchronized (this.hostedTXStates) {
+ TXStateProxy curVal = this.hostedTXStates.get(key);
+ // Inflight op could be received later than TXFailover operation.
+ if (curVal == null) {
+ if (!isHostedTxRecentlyCompleted(key)) {
+ this.hostedTXStates.put(key, val);
+ // Failover op removed the val
+ // It is possible that the same operation can be executed
+ // twice by two threads, but data is consistent.
+ }
+ } else {
+ if (val != curVal) {
+ //Failover op replaced with a new TXStateProxyImpl
+ //Use the new one instead.
+ val.getLock().unlock();
+ return false;
+ }
+ }
}
}
-
- setTXState(val);
- return val;
+ return true;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2d70868b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
index 626efef..b35133a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
@@ -58,6 +58,7 @@ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionException;
import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.TXId;
import com.gemstone.gemfire.internal.cache.TXManagerImpl;
import com.gemstone.gemfire.internal.cache.TXStateProxy;
import com.gemstone.gemfire.internal.cache.TransactionMessage;
@@ -262,8 +263,8 @@ public abstract class PartitionMessage extends DistributionMessage implements
/**
* check to see if the cache is closing
*/
- final public boolean checkCacheClosing(DistributionManager dm) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ public boolean checkCacheClosing(DistributionManager dm) {
+ GemFireCacheImpl cache = getGemFireCacheImpl();
// return (cache != null && cache.isClosed());
return cache == null || cache.isClosed();
}
@@ -272,11 +273,54 @@ public abstract class PartitionMessage extends DistributionMessage implements
* check to see if the distributed system is closing
* @return true if the distributed system is closing
*/
- final public boolean checkDSClosing(DistributionManager dm) {
+ public boolean checkDSClosing(DistributionManager dm) {
InternalDistributedSystem ds = dm.getSystem();
return (ds == null || ds.isDisconnecting());
}
+ boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr) {
+ if (tx == null) {
+ return false;
+ } else {
+ TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId());
+ if (hasTXRecentlyCompleted(txid, txMgr)) {
+ //Should only happen when handling a later arrival of transactional op from proxy,
+ //while the transaction has failed over and already committed or rolled back.
+ //Just send back reply as a success op.
+ //The client connection should be lost from proxy, or
+ //the proxy is closed for failover to occur.
+ logger.info("TxId {} has already finished." , txid);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ PartitionedRegion getPartitionedRegion() throws PRLocallyDestroyedException {
+ return PartitionedRegion.getPRFromId(this.regionId);
+ }
+
+ GemFireCacheImpl getGemFireCacheImpl() {
+ return GemFireCacheImpl.getInstance();
+ }
+
+ TXManagerImpl getTXManagerImpl(GemFireCacheImpl cache) {
+ return cache.getTxManager();
+ }
+
+ long getStartPartitionMessageProcessingTime(PartitionedRegion pr) {
+ return pr.getPrStats().startPartitionMessageProcessing();
+ }
+
+ TXStateProxy masqueradeAs(TransactionMessage msg, TXManagerImpl txMgr) throws InterruptedException {
+ return txMgr.masqueradeAs(msg);
+ }
+
+ boolean hasTXRecentlyCompleted(TXId txid, TXManagerImpl txMgr) {
+ return txMgr.isHostedTxRecentlyCompleted(txid);
+ }
+
/**
* Upon receipt of the message, both process the message and send an
* acknowledgement, not necessarily in that order. Note: Any hang in this
@@ -298,7 +342,7 @@ public abstract class PartitionMessage extends DistributionMessage implements
thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString(dm.getId()));
return;
}
- pr = PartitionedRegion.getPRFromId(this.regionId);
+ pr = getPartitionedRegion();
if (pr == null && failIfRegionMissing()) {
// if the distributed system is disconnecting, don't send a reply saying
// the partitioned region can't be found (bug 36585)
@@ -307,19 +351,21 @@ public abstract class PartitionMessage extends DistributionMessage implements
}
if (pr != null) {
- startTime = pr.getPrStats().startPartitionMessageProcessing();
+ startTime = getStartPartitionMessageProcessingTime(pr);
}
thr = UNHANDLED_EXCEPTION;
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ GemFireCacheImpl cache = getGemFireCacheImpl();
if(cache==null) {
throw new ForceReattemptException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString());
}
- TXManagerImpl txMgr = cache.getTxManager();
+ TXManagerImpl txMgr = getTXManagerImpl(cache);
TXStateProxy tx = null;
try {
- tx = txMgr.masqueradeAs(this);
- sendReply = operateOnPartitionedRegion(dm, pr, startTime);
+ tx = masqueradeAs(this, txMgr);
+ if (!hasTxAlreadyFinished(tx, txMgr)) {
+ sendReply = operateOnPartitionedRegion(dm, pr, startTime);
+ }
} finally {
txMgr.unmasquerade(tx);
}