You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/08/24 20:37:44 UTC
[geode] 01/02: GEODE-5624: Use a thread to do beforeCompletion and
afterCompletion.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-5624
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 413b34f3a63af62094925b16f280ae4a641d4dc6
Author: eshu <es...@pivotal.io>
AuthorDate: Thu Aug 23 14:24:48 2018 -0700
GEODE-5624: Use a thread to do beforeCompletion and afterCompletion.
---
.../ClientServerJTAFailoverDistributedTest.java | 25 ++
.../org/apache/geode/internal/cache/TXState.java | 268 ++++++++++++++++++---
.../cache/TXStateSynchronizationRunnable.java | 144 +++++++++++
3 files changed, 400 insertions(+), 37 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
index c623766..e4589fa 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
@@ -59,12 +59,14 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
private String hostName;
private String uniqueName;
private String regionName;
+ private String replicateRegionName;
private VM server1;
private VM server2;
private VM server3;
private VM client1;
private int port1;
private int port2;
+ private boolean hasReplicateRegion = false;
private final int key = 1;
private final String value = "value1";
@@ -92,6 +94,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
hostName = getHostName();
uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
regionName = uniqueName + "_region";
+ replicateRegionName = uniqueName + "_replicate_region";
}
@Test
@@ -126,6 +129,9 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
PartitionAttributes partitionAttributes = factory.create();
cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
.setPartitionAttributes(partitionAttributes).create(regionName);
+ if (hasReplicateRegion) {
+ cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE).create(replicateRegionName);
+ }
CacheServer server = cacheRule.getCache().addCacheServer();
server.setPort(0);
@@ -148,6 +154,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
crf.setPoolName(pool.getName());
crf.create(regionName);
+ if (hasReplicateRegion) crf.create(replicateRegionName);
if (ports.length > 1) {
pool.acquireConnection(new ServerLocation(hostName, port1));
@@ -171,9 +178,11 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
Object[] results = new Object[2];
InternalClientCache cache = clientCacheRule.getClientCache();
Region region = cache.getRegion(regionName);
+ Region replicateRegion = hasReplicateRegion? cache.getRegion(replicateRegionName) : null;
TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
txManager.begin();
region.put(key, newValue);
+ if (hasReplicateRegion) replicateRegion.put(key, newValue);
TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null);
@@ -188,6 +197,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
private void doAfterCompletion(TransactionId transactionId, boolean isCommit) {
InternalClientCache cache = clientCacheRule.getClientCache();
Region region = cache.getRegion(regionName);
+ Region replicateRegion = cache.getRegion(replicateRegionName);
TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
txManager.resume(transactionId);
@@ -202,6 +212,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
}
if (isCommit) {
assertEquals(newValue, region.get(key));
+ if (hasReplicateRegion) assertEquals(newValue, replicateRegion.get(key));
} else {
assertEquals(value, region.get(key));
}
@@ -292,4 +303,18 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
txStateStub.beforeCompletion();
}
+ @Test
+ public void jtaCanFailoverToJTAHostForMixedRegionsAfterDoneBeforeCompletion() {
+ hasReplicateRegion = true;
+ port2 = server2.invoke(() -> createServerRegion(1, false));
+ server2.invoke(() -> doPut(key, value));
+ port1 = server1.invoke(() -> createServerRegion(1, true));
+
+ client1.invoke(() -> createClientRegion(port1, port2));
+ Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion());
+
+ server1.invoke(() -> cacheRule.getCache().close());
+
+ client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], true));
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 9494fd3..f33267e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.Status;
@@ -51,6 +52,7 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.cache.client.internal.ServerRegionDataAccess;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.TXManagerCancelledException;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.control.MemoryThresholds;
@@ -101,6 +103,15 @@ public class TXState implements TXStateInterface {
// Access this variable should be in synchronized block.
private boolean beforeCompletionCalled;
+ /**
+ * for client/server JTA transactions we need to have a single thread handle both beforeCompletion
+ * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step.
+ * This is that thread
+ */
+ protected volatile TXStateSynchronizationRunnable syncRunnable;
+ private volatile SynchronizationCommitConflictException beforeCompletionException;
+ private volatile RuntimeException afterCompletionException;
+
// Internal testing hooks
private Runnable internalAfterReservation;
protected Runnable internalAfterConflictCheck;
@@ -910,6 +921,9 @@ public class TXState implements TXStateInterface {
synchronized (this.completionGuard) {
this.completionGuard.notifyAll();
}
+ if (this.syncRunnable != null) {
+ this.syncRunnable.abort();
+ }
if (iae != null && !this.proxy.getCache().isClosed()) {
throw iae;
}
@@ -996,6 +1010,75 @@ public class TXState implements TXStateInterface {
}
}
+// //////////////////////////////////////////////////////////////////
+// // JTA Synchronization implementation //
+// //////////////////////////////////////////////////////////////////
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.geode.internal.cache.TXStateInterface#beforeCompletion()
+// */
+// @Override
+// public synchronized void beforeCompletion() throws SynchronizationCommitConflictException {
+// if (this.closed) {
+// throw new TXManagerCancelledException();
+// }
+// if (beforeCompletionCalled) {
+// // do not re-execute beforeCompletion again
+// return;
+// }
+// beforeCompletionCalled = true;
+// doBeforeCompletion();
+// }
+//
+// private void doBeforeCompletion() {
+// final long opStart = CachePerfStats.getStatTime();
+// this.jtaLifeTime = opStart - getBeginTime();
+// try {
+// reserveAndCheck();
+// /*
+// * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
+// * the transaction.
+// */
+// TransactionWriter writer = this.proxy.getTxMgr().getWriter();
+// if (writer != null) {
+// try {
+// // need to mark this so we don't fire again in commit
+// firedWriter = true;
+// TXEvent event = getEvent();
+// if (!event.hasOnlyInternalEvents()) {
+// writer.beforeCommit(event);
+// }
+// } catch (TransactionWriterException twe) {
+// throw new CommitConflictException(twe);
+// } catch (VirtualMachineError err) {
+// // cleanup(); this allocates objects so I don't think we can do it - that leaves the TX
+// // open, but we are poison pilling so we should be ok??
+//
+// SystemFailure.initiateFailure(err);
+// // If this ever returns, rethrow the error. We're poisoned
+// // now, so don't let this thread continue.
+// throw err;
+// } catch (Throwable t) {
+// // Whenever you catch Error or Throwable, you must also
+// // catch VirtualMachineError (see above). However, there is
+// // _still_ a possibility that you are dealing with a cascading
+// // error condition, so you also need to check to see if the JVM
+// // is still usable:
+// SystemFailure.checkFailure();
+// throw new CommitConflictException(t);
+// }
+// }
+// } catch (CommitConflictException commitConflict) {
+// cleanup();
+// proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
+// throw new SynchronizationCommitConflictException(
+// LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
+// .toLocalizedString(getTransactionId()),
+// commitConflict);
+// }
+// }
+
//////////////////////////////////////////////////////////////////
// JTA Synchronization implementation //
//////////////////////////////////////////////////////////////////
@@ -1009,17 +1092,57 @@ public class TXState implements TXStateInterface {
if (this.closed) {
throw new TXManagerCancelledException();
}
+
if (beforeCompletionCalled) {
// do not re-execute beforeCompletion again
return;
}
beforeCompletionCalled = true;
- doBeforeCompletion();
+
+ TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable();
+ setSynchronizationRunnable(sync);
+
+ Executor exec = getExecutor();
+ exec.execute(sync);
+ sync.waitForFirstExecution();
+ if (getBeforeCompletionException() != null) {
+ throw getBeforeCompletionException();
+ }
+ //doBeforeCompletion();
}
+ TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() {
+ Runnable beforeCompletion = new Runnable() {
+ @SuppressWarnings("synthetic-access")
+ public void run() {
+ doBeforeCompletion();
+ }
+ };
+
+ return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(),
+ beforeCompletion);
+ }
+
+ Executor getExecutor() {
+ return InternalDistributedSystem.getConnectedInstance().getDistributionManager()
+ .getWaitingThreadPool();
+ }
+
+ SynchronizationCommitConflictException getBeforeCompletionException() {
+ return beforeCompletionException;
+ }
+
+ private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) {
+ syncRunnable = synchronizationRunnable;
+ }
+
+
private void doBeforeCompletion() {
+ proxy.getTxMgr().setTXState(null);
final long opStart = CachePerfStats.getStatTime();
this.jtaLifeTime = opStart - getBeginTime();
+
+
try {
reserveAndCheck();
/*
@@ -1066,17 +1189,28 @@ public class TXState implements TXStateInterface {
}
/*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
- */
+ * (non-Javadoc)
+ *
+ * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
+ */
@Override
public synchronized void afterCompletion(int status) {
- this.proxy.getTxMgr().setTXState(null);
- // For commit, beforeCompletion should be called. Otherwise
+ proxy.getTxMgr().setTXState(null);
+ Runnable afterCompletion = new Runnable() {
+ @SuppressWarnings("synthetic-access")
+ public void run() {
+ doAfterCompletion(status);
+ }
+ };
+ // if there was a beforeCompletion call then there will be a thread
+ // sitting in the waiting pool to execute afterCompletion. Otherwise
// throw FailedSynchronizationException().
- if (wasBeforeCompletionCalled()) {
- doAfterCompletion(status);
+ TXStateSynchronizationRunnable sync = getSynchronizationRunnable();
+ if (sync != null) {
+ sync.runSecondRunnable(afterCompletion);
+ if (getAfterCompletionException() != null) {
+ throw getAfterCompletionException();
+ }
} else {
// rollback does not run beforeCompletion.
if (status != Status.STATUS_ROLLEDBACK) {
@@ -1087,37 +1221,97 @@ public class TXState implements TXStateInterface {
}
}
+ TXStateSynchronizationRunnable getSynchronizationRunnable() {
+ return this.syncRunnable;
+ }
+
+ RuntimeException getAfterCompletionException() {
+ return afterCompletionException;
+ }
+
private void doAfterCompletion(int status) {
final long opStart = CachePerfStats.getStatTime();
- try {
- switch (status) {
- case Status.STATUS_COMMITTED:
- Assert.assertTrue(this.locks != null,
- "Gemfire Transaction afterCompletion called with illegal state.");
- try {
- commit();
- saveTXCommitMessageForClientFailover();
- } catch (CommitConflictException error) {
- Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
- + " afterCompletion failed.due to CommitConflictException: " + error);
- }
-
- this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
- this.locks = null;
- break;
- case Status.STATUS_ROLLEDBACK:
- this.jtaLifeTime = opStart - getBeginTime();
- rollback();
+ switch (status) {
+ case Status.STATUS_COMMITTED:
+ Assert.assertTrue(this.locks != null,
+ "Gemfire Transaction afterCompletion called with illegal state.");
+ try {
+ proxy.getTxMgr().setTXState(null);
+ commit();
saveTXCommitMessageForClientFailover();
- this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
- break;
- default:
- Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
- }
- } catch (InternalGemFireError error) {
- throw new TransactionException(error);
- }
- }
+ } catch (CommitConflictException error) {
+ Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
+ + " afterCompletion failed.due to CommitConflictException: " + error);
+ }
+
+ this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
+ this.locks = null;
+ break;
+ case Status.STATUS_ROLLEDBACK:
+ this.jtaLifeTime = opStart - getBeginTime();
+ this.proxy.getTxMgr().setTXState(null);
+ rollback();
+ saveTXCommitMessageForClientFailover();
+ this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
+ break;
+ default:
+ Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
+ }
+ }
+
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
+// */
+// @Override
+// public synchronized void afterCompletion(int status) {
+// this.proxy.getTxMgr().setTXState(null);
+// // For commit, beforeCompletion should be called. Otherwise
+// // throw FailedSynchronizationException().
+// if (wasBeforeCompletionCalled()) {
+// doAfterCompletion(status);
+// } else {
+// // rollback does not run beforeCompletion.
+// if (status != Status.STATUS_ROLLEDBACK) {
+// throw new FailedSynchronizationException(
+// "Could not execute afterCompletion when beforeCompletion was not executed");
+// }
+// doAfterCompletion(status);
+// }
+// }
+//
+// private void doAfterCompletion(int status) {
+// final long opStart = CachePerfStats.getStatTime();
+// try {
+// switch (status) {
+// case Status.STATUS_COMMITTED:
+// Assert.assertTrue(this.locks != null,
+// "Gemfire Transaction afterCompletion called with illegal state.");
+// try {
+// commit();
+// saveTXCommitMessageForClientFailover();
+// } catch (CommitConflictException error) {
+// Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
+// + " afterCompletion failed.due to CommitConflictException: " + error);
+// }
+//
+// this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
+// this.locks = null;
+// break;
+// case Status.STATUS_ROLLEDBACK:
+// this.jtaLifeTime = opStart - getBeginTime();
+// rollback();
+// saveTXCommitMessageForClientFailover();
+// this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
+// break;
+// default:
+// Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
+// }
+// } catch (InternalGemFireError error) {
+// throw new TransactionException(error);
+// }
+// }
boolean wasBeforeCompletionCalled() {
return beforeCompletionCalled;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
new file mode 100644
index 0000000..28f367b
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls.
+ * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
+ * Then you must invoke runSecondRunnable() with another Runnable that invokes afterCompletion
+ * behavior.
+ *
+ * @since Geode 1.6.0
+ */
+public class TXStateSynchronizationRunnable implements Runnable {
+ private static final Logger logger = LogService.getLogger();
+
+ private final CancelCriterion cancelCriterion;
+
+ private Runnable firstRunnable;
+ private final Object firstRunnableSync = new Object();
+ private boolean firstRunnableCompleted;
+
+ private Runnable secondRunnable;
+ private final Object secondRunnableSync = new Object();
+ private boolean secondRunnableCompleted;
+
+ private boolean abort;
+
+ public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion, final Runnable beforeCompletion) {
+ this.cancelCriterion = cancelCriterion;
+ this.firstRunnable = beforeCompletion;
+ }
+
+ @Override
+ public void run() {
+ doSynchronizationOps();
+ }
+
+ private void doSynchronizationOps() {
+ synchronized (this.firstRunnableSync) {
+ try {
+ this.firstRunnable.run();
+ } finally {
+ if (logger.isTraceEnabled()) {
+ logger.trace("beforeCompletion notification completed");
+ }
+ this.firstRunnableCompleted = true;
+ this.firstRunnable = null;
+ this.firstRunnableSync.notifyAll();
+ }
+ }
+ synchronized (this.secondRunnableSync) {
+ // TODO there should be a transaction timeout that keeps this thread
+ // from sitting around forever if the client goes away
+ final boolean isTraceEnabled = logger.isTraceEnabled();
+ while (this.secondRunnable == null && !this.abort) {
+ try {
+ if (isTraceEnabled) {
+ logger.trace("waiting for afterCompletion notification");
+ }
+ this.secondRunnableSync.wait(1000);
+ } catch (InterruptedException ignore) {
+ // eat the interrupt and check for exit conditions
+ }
+ }
+ if (isTraceEnabled) {
+ logger.trace("executing afterCompletion notification");
+ }
+ try {
+ if (!this.abort) {
+ this.secondRunnable.run();
+ }
+ } finally {
+ if (isTraceEnabled) {
+ logger.trace("afterCompletion notification completed");
+ }
+ this.secondRunnableCompleted = true;
+ this.secondRunnable = null;
+ this.secondRunnableSync.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * wait for the initial beforeCompletion step to finish
+ */
+ public void waitForFirstExecution() {
+ synchronized (this.firstRunnableSync) {
+ while (!this.firstRunnableCompleted) {
+ try {
+ this.firstRunnableSync.wait(1000);
+ } catch (InterruptedException ignore) {
+ // eat the interrupt and check for exit conditions
+ }
+ cancelCriterion.checkCancelInProgress(null);
+ }
+ }
+ }
+
+ /**
+ * run the afterCompletion portion of synchronization. This method schedules execution of the
+ * given runnable and then waits for it to finish running
+ */
+ public void runSecondRunnable(Runnable r) {
+ synchronized (this.secondRunnableSync) {
+ this.secondRunnable = r;
+ this.secondRunnableSync.notifyAll();
+ while (!this.secondRunnableCompleted && !this.abort) {
+ try {
+ this.secondRunnableSync.wait(1000);
+ } catch (InterruptedException ignore) {
+ // eat the interrupt and check for exit conditions
+ }
+ cancelCriterion.checkCancelInProgress(null);
+ }
+ }
+ }
+
+ /**
+ * stop waiting for an afterCompletion to arrive and just exit
+ */
+ public void abort() {
+ synchronized (this.secondRunnableSync) {
+ this.abort = true;
+ }
+ }
+}