You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/08/24 20:37:45 UTC
[geode] 02/02: wip -- refactor classes.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-5624
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 4193fb719694615765fea70fd2169a1b6653c3ce
Author: eshu <es...@pivotal.io>
AuthorDate: Fri Aug 24 13:37:08 2018 -0700
wip -- refactor classes.
---
.../ClientServerJTAFailoverDistributedTest.java | 14 +-
.../internal/cache/SingleThreadJTAExecutor.java | 197 ++++++++++++++++++
.../org/apache/geode/internal/cache/TXState.java | 225 +++------------------
.../cache/TXStateSynchronizationRunnable.java | 144 -------------
.../cache/SingleThreadJTAExecutorTest.java | 109 ++++++++++
.../apache/geode/internal/cache/TXStateTest.java | 51 ++---
6 files changed, 363 insertions(+), 377 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
index e4589fa..2ae4e9b 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
@@ -130,7 +130,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
.setPartitionAttributes(partitionAttributes).create(regionName);
if (hasReplicateRegion) {
- cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE).create(replicateRegionName);
+ cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE)
+ .create(replicateRegionName);
}
CacheServer server = cacheRule.getCache().addCacheServer();
@@ -154,7 +155,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
crf.setPoolName(pool.getName());
crf.create(regionName);
- if (hasReplicateRegion) crf.create(replicateRegionName);
+ if (hasReplicateRegion)
+ crf.create(replicateRegionName);
if (ports.length > 1) {
pool.acquireConnection(new ServerLocation(hostName, port1));
@@ -178,11 +180,12 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
Object[] results = new Object[2];
InternalClientCache cache = clientCacheRule.getClientCache();
Region region = cache.getRegion(regionName);
- Region replicateRegion = hasReplicateRegion? cache.getRegion(replicateRegionName) : null;
+ Region replicateRegion = hasReplicateRegion ? cache.getRegion(replicateRegionName) : null;
TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
txManager.begin();
region.put(key, newValue);
- if (hasReplicateRegion) replicateRegion.put(key, newValue);
+ if (hasReplicateRegion)
+ replicateRegion.put(key, newValue);
TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null);
@@ -212,7 +215,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
}
if (isCommit) {
assertEquals(newValue, region.get(key));
- if (hasReplicateRegion) assertEquals(newValue, replicateRegion.get(key));
+ if (hasReplicateRegion)
+ assertEquals(newValue, replicateRegion.get(key));
} else {
assertEquals(value, region.get(key));
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
new file mode 100644
index 0000000..4df8ca4
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.concurrent.Executor;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls.
+ * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
+ * Then you must invoke executeAfterCompletion() with another Runnable that invokes afterCompletion
+ * behavior.
+ *
+ * @since Geode 1.6.0
+ */
+public class SingleThreadJTAExecutor {
+ private static final Logger logger = LogService.getLogger();
+
+ private final Object beforeCompletionSync = new Object();
+ private boolean beforeCompletionStarted;
+ private boolean beforeCompletionFinished;
+ private SynchronizationCommitConflictException beforeCompletionException;
+
+ private final Object afterCompletionSync = new Object();
+ private boolean afterCompletionStarted;
+ private boolean afterCompletionFinished;
+ private int afterCompletionStatus = -1;
+ private boolean afterCompletionCancelled;
+ private RuntimeException afterCompletionException;
+
+ public SingleThreadJTAExecutor() {}
+
+ void doOps(TXState txState) {
+ doBeforeCompletionOp(txState);
+ doAfterCompletionOp(txState);
+ }
+
+ void doBeforeCompletionOp(TXState txState) {
+ synchronized (beforeCompletionSync) {
+ try {
+ txState.doBeforeCompletion();
+ } catch (SynchronizationCommitConflictException exception) {
+ beforeCompletionException = exception;
+ } finally {
+ if (logger.isDebugEnabled()) {
+ logger.debug("beforeCompletion notification completed");
+ }
+ beforeCompletionFinished = true;
+ beforeCompletionSync.notifyAll();
+ }
+ }
+ }
+
+ boolean isBeforeCompletionStarted() {
+ synchronized (beforeCompletionSync) {
+ return beforeCompletionStarted;
+ }
+ }
+
+ boolean isAfterCompletionStarted() {
+ synchronized (afterCompletionSync) {
+ return afterCompletionStarted;
+ }
+ }
+
+ boolean isBeforeCompletionFinished() {
+ synchronized (beforeCompletionSync) {
+ return beforeCompletionFinished;
+ }
+ }
+
+ boolean isAfterCompletionFinished() {
+ synchronized (afterCompletionSync) {
+ return afterCompletionFinished;
+ }
+ }
+
+ public void executeBeforeCompletion(TXState txState, Executor executor) {
+ executor.execute(() -> doOps(txState));
+
+ synchronized (beforeCompletionSync) {
+ beforeCompletionStarted = true;
+ while (!beforeCompletionFinished) {
+ try {
+ beforeCompletionSync.wait(1000);
+ } catch (InterruptedException ignore) {
+ // eat the interrupt and check for exit conditions
+ }
+ txState.getCache().getCancelCriterion().checkCancelInProgress(null);
+ }
+ if (getBeforeCompletionException() != null) {
+ throw getBeforeCompletionException();
+ }
+ }
+ }
+
+ SynchronizationCommitConflictException getBeforeCompletionException() {
+ return beforeCompletionException;
+ }
+
+ private void doAfterCompletionOp(TXState txState) {
+ synchronized (afterCompletionSync) {
+ // there should be a transaction timeout that keeps this thread
+ // from sitting around forever if the client goes away
+ // The above was done by setting afterCompletionCancelled in txState
+ // during cleanup. When client departed, the transaction/JTA
+ // will be timed out and cleanup code will be executed.
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ while (afterCompletionStatus == -1 && !afterCompletionCancelled) {
+ try {
+ if (isDebugEnabled) {
+ logger.debug("waiting for afterCompletion notification");
+ }
+ afterCompletionSync.wait(1000);
+ } catch (InterruptedException ignore) {
+ // eat the interrupt and check for exit conditions
+ }
+ }
+ afterCompletionStarted = true;
+ if (isDebugEnabled) {
+ logger.debug("executing afterCompletion notification");
+ }
+ try {
+ if (!afterCompletionCancelled) {
+ txState.doAfterCompletion(afterCompletionStatus);
+ } else {
+ txState.doCleanup();
+ }
+ } catch (RuntimeException exception) {
+ afterCompletionException = exception;
+ } finally {
+ if (isDebugEnabled) {
+ logger.debug("afterCompletion notification completed");
+ }
+ afterCompletionFinished = true;
+ afterCompletionSync.notifyAll();
+ }
+ }
+ }
+
+ public void executeAfterCompletion(TXState txState, int status) {
+ synchronized (afterCompletionSync) {
+ afterCompletionStatus = status;
+ afterCompletionSync.notifyAll();
+ waitForAfterCompletionToFinish(txState);
+ if (getAfterCompletionException() != null) {
+ throw getAfterCompletionException();
+ }
+ }
+ }
+
+ private void waitForAfterCompletionToFinish(TXState txState) {
+ while (!afterCompletionFinished) {
+ try {
+ afterCompletionSync.wait(1000);
+ } catch (InterruptedException ignore) {
+ // eat the interrupt and check for exit conditions
+ }
+ txState.getCache().getCancelCriterion().checkCancelInProgress(null);
+ }
+ }
+
+ RuntimeException getAfterCompletionException() {
+ return afterCompletionException;
+ }
+
+ /**
+ * stop waiting for an afterCompletion to arrive and just exit
+ */
+ public void cleanup(TXState txState) {
+ synchronized (afterCompletionSync) {
+ afterCompletionCancelled = true;
+ afterCompletionSync.notifyAll();
+ waitForAfterCompletionToFinish(txState);
+ }
+ }
+
+ public boolean shouldDoCleanup() {
+ return isBeforeCompletionStarted() && !isAfterCompletionStarted();
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index f33267e..bce1af4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -32,7 +32,6 @@ import javax.transaction.Status;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
-import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.DiskAccessException;
@@ -44,7 +43,6 @@ import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.TransactionDataRebalancedException;
-import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.TransactionWriter;
import org.apache.geode.cache.TransactionWriterException;
@@ -52,7 +50,6 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.cache.client.internal.ServerRegionDataAccess;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.TXManagerCancelledException;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.control.MemoryThresholds;
@@ -108,9 +105,7 @@ public class TXState implements TXStateInterface {
* and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step.
* This is that thread
*/
- protected volatile TXStateSynchronizationRunnable syncRunnable;
- private volatile SynchronizationCommitConflictException beforeCompletionException;
- private volatile RuntimeException afterCompletionException;
+ private final SingleThreadJTAExecutor singleThreadJTAExecutor;
// Internal testing hooks
private Runnable internalAfterReservation;
@@ -153,6 +148,11 @@ public class TXState implements TXStateInterface {
private volatile DistributedMember proxyServer;
public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub) {
+ this(proxy, onBehalfOfRemoteStub, new SingleThreadJTAExecutor());
+ }
+
+ public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub,
+ SingleThreadJTAExecutor singleThreadJTAExecutor) {
this.beginTime = CachePerfStats.getStatTime();
this.regions = new IdentityHashMap<>();
@@ -165,7 +165,7 @@ public class TXState implements TXStateInterface {
this.internalAfterSend = null;
this.proxy = proxy;
this.onBehalfOfRemoteStub = onBehalfOfRemoteStub;
-
+ this.singleThreadJTAExecutor = singleThreadJTAExecutor;
}
private boolean hasSeenEvent(EntryEventImpl event) {
@@ -428,7 +428,7 @@ public class TXState implements TXStateInterface {
}
/*
- * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
+ * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup
* the transaction.
*/
TransactionWriter writer = this.proxy.getTxMgr().getWriter();
@@ -868,6 +868,14 @@ public class TXState implements TXStateInterface {
}
protected void cleanup() {
+ if (singleThreadJTAExecutor.shouldDoCleanup()) {
+ singleThreadJTAExecutor.cleanup(this);
+ } else {
+ doCleanup();
+ }
+ }
+
+ protected void doCleanup() {
IllegalArgumentException iae = null;
try {
this.closed = true;
@@ -921,9 +929,7 @@ public class TXState implements TXStateInterface {
synchronized (this.completionGuard) {
this.completionGuard.notifyAll();
}
- if (this.syncRunnable != null) {
- this.syncRunnable.abort();
- }
+
if (iae != null && !this.proxy.getCache().isClosed()) {
throw iae;
}
@@ -1010,75 +1016,6 @@ public class TXState implements TXStateInterface {
}
}
-// //////////////////////////////////////////////////////////////////
-// // JTA Synchronization implementation //
-// //////////////////////////////////////////////////////////////////
-// /*
-// * (non-Javadoc)
-// *
-// * @see org.apache.geode.internal.cache.TXStateInterface#beforeCompletion()
-// */
-// @Override
-// public synchronized void beforeCompletion() throws SynchronizationCommitConflictException {
-// if (this.closed) {
-// throw new TXManagerCancelledException();
-// }
-// if (beforeCompletionCalled) {
-// // do not re-execute beforeCompletion again
-// return;
-// }
-// beforeCompletionCalled = true;
-// doBeforeCompletion();
-// }
-//
-// private void doBeforeCompletion() {
-// final long opStart = CachePerfStats.getStatTime();
-// this.jtaLifeTime = opStart - getBeginTime();
-// try {
-// reserveAndCheck();
-// /*
-// * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
-// * the transaction.
-// */
-// TransactionWriter writer = this.proxy.getTxMgr().getWriter();
-// if (writer != null) {
-// try {
-// // need to mark this so we don't fire again in commit
-// firedWriter = true;
-// TXEvent event = getEvent();
-// if (!event.hasOnlyInternalEvents()) {
-// writer.beforeCommit(event);
-// }
-// } catch (TransactionWriterException twe) {
-// throw new CommitConflictException(twe);
-// } catch (VirtualMachineError err) {
-// // cleanup(); this allocates objects so I don't think we can do it - that leaves the TX
-// // open, but we are poison pilling so we should be ok??
-//
-// SystemFailure.initiateFailure(err);
-// // If this ever returns, rethrow the error. We're poisoned
-// // now, so don't let this thread continue.
-// throw err;
-// } catch (Throwable t) {
-// // Whenever you catch Error or Throwable, you must also
-// // catch VirtualMachineError (see above). However, there is
-// // _still_ a possibility that you are dealing with a cascading
-// // error condition, so you also need to check to see if the JVM
-// // is still usable:
-// SystemFailure.checkFailure();
-// throw new CommitConflictException(t);
-// }
-// }
-// } catch (CommitConflictException commitConflict) {
-// cleanup();
-// proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
-// throw new SynchronizationCommitConflictException(
-// LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
-// .toLocalizedString(getTransactionId()),
-// commitConflict);
-// }
-// }
-
//////////////////////////////////////////////////////////////////
// JTA Synchronization implementation //
//////////////////////////////////////////////////////////////////
@@ -1098,55 +1035,23 @@ public class TXState implements TXStateInterface {
return;
}
beforeCompletionCalled = true;
-
- TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable();
- setSynchronizationRunnable(sync);
-
- Executor exec = getExecutor();
- exec.execute(sync);
- sync.waitForFirstExecution();
- if (getBeforeCompletionException() != null) {
- throw getBeforeCompletionException();
- }
- //doBeforeCompletion();
- }
-
- TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() {
- Runnable beforeCompletion = new Runnable() {
- @SuppressWarnings("synthetic-access")
- public void run() {
- doBeforeCompletion();
- }
- };
-
- return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(),
- beforeCompletion);
+ singleThreadJTAExecutor.executeBeforeCompletion(this,
+ getExecutor());
}
Executor getExecutor() {
- return InternalDistributedSystem.getConnectedInstance().getDistributionManager()
- .getWaitingThreadPool();
- }
-
- SynchronizationCommitConflictException getBeforeCompletionException() {
- return beforeCompletionException;
- }
-
- private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) {
- syncRunnable = synchronizationRunnable;
+ return getCache().getDistributionManager().getWaitingThreadPool();
}
-
- private void doBeforeCompletion() {
+ void doBeforeCompletion() {
proxy.getTxMgr().setTXState(null);
final long opStart = CachePerfStats.getStatTime();
this.jtaLifeTime = opStart - getBeginTime();
-
try {
reserveAndCheck();
/*
- * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
+ * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup
* the transaction.
*/
TransactionWriter writer = this.proxy.getTxMgr().getWriter();
@@ -1189,28 +1094,18 @@ public class TXState implements TXStateInterface {
}
/*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
- */
+ * (non-Javadoc)
+ *
+ * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
+ */
@Override
public synchronized void afterCompletion(int status) {
proxy.getTxMgr().setTXState(null);
- Runnable afterCompletion = new Runnable() {
- @SuppressWarnings("synthetic-access")
- public void run() {
- doAfterCompletion(status);
- }
- };
// if there was a beforeCompletion call then there will be a thread
// sitting in the waiting pool to execute afterCompletion. Otherwise
// throw FailedSynchronizationException().
- TXStateSynchronizationRunnable sync = getSynchronizationRunnable();
- if (sync != null) {
- sync.runSecondRunnable(afterCompletion);
- if (getAfterCompletionException() != null) {
- throw getAfterCompletionException();
- }
+ if (beforeCompletionCalled) {
+ singleThreadJTAExecutor.executeAfterCompletion(this, status);
} else {
// rollback does not run beforeCompletion.
if (status != Status.STATUS_ROLLEDBACK) {
@@ -1221,15 +1116,7 @@ public class TXState implements TXStateInterface {
}
}
- TXStateSynchronizationRunnable getSynchronizationRunnable() {
- return this.syncRunnable;
- }
-
- RuntimeException getAfterCompletionException() {
- return afterCompletionException;
- }
-
- private void doAfterCompletion(int status) {
+ void doAfterCompletion(int status) {
final long opStart = CachePerfStats.getStatTime();
switch (status) {
case Status.STATUS_COMMITTED:
@@ -1259,60 +1146,6 @@ public class TXState implements TXStateInterface {
}
}
-// /*
-// * (non-Javadoc)
-// *
-// * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
-// */
-// @Override
-// public synchronized void afterCompletion(int status) {
-// this.proxy.getTxMgr().setTXState(null);
-// // For commit, beforeCompletion should be called. Otherwise
-// // throw FailedSynchronizationException().
-// if (wasBeforeCompletionCalled()) {
-// doAfterCompletion(status);
-// } else {
-// // rollback does not run beforeCompletion.
-// if (status != Status.STATUS_ROLLEDBACK) {
-// throw new FailedSynchronizationException(
-// "Could not execute afterCompletion when beforeCompletion was not executed");
-// }
-// doAfterCompletion(status);
-// }
-// }
-//
-// private void doAfterCompletion(int status) {
-// final long opStart = CachePerfStats.getStatTime();
-// try {
-// switch (status) {
-// case Status.STATUS_COMMITTED:
-// Assert.assertTrue(this.locks != null,
-// "Gemfire Transaction afterCompletion called with illegal state.");
-// try {
-// commit();
-// saveTXCommitMessageForClientFailover();
-// } catch (CommitConflictException error) {
-// Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
-// + " afterCompletion failed.due to CommitConflictException: " + error);
-// }
-//
-// this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
-// this.locks = null;
-// break;
-// case Status.STATUS_ROLLEDBACK:
-// this.jtaLifeTime = opStart - getBeginTime();
-// rollback();
-// saveTXCommitMessageForClientFailover();
-// this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
-// break;
-// default:
-// Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
-// }
-// } catch (InternalGemFireError error) {
-// throw new TransactionException(error);
-// }
-// }
-
boolean wasBeforeCompletionCalled() {
return beforeCompletionCalled;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
deleted file mode 100644
index 28f367b..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
-import org.apache.geode.internal.logging.LogService;
-
-/**
- * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls.
- * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
- * Then you must invoke runSecondRunnable() with another Runnable that invokes afterCompletion
- * behavior.
- *
- * @since Geode 1.6.0
- */
-public class TXStateSynchronizationRunnable implements Runnable {
- private static final Logger logger = LogService.getLogger();
-
- private final CancelCriterion cancelCriterion;
-
- private Runnable firstRunnable;
- private final Object firstRunnableSync = new Object();
- private boolean firstRunnableCompleted;
-
- private Runnable secondRunnable;
- private final Object secondRunnableSync = new Object();
- private boolean secondRunnableCompleted;
-
- private boolean abort;
-
- public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion, final Runnable beforeCompletion) {
- this.cancelCriterion = cancelCriterion;
- this.firstRunnable = beforeCompletion;
- }
-
- @Override
- public void run() {
- doSynchronizationOps();
- }
-
- private void doSynchronizationOps() {
- synchronized (this.firstRunnableSync) {
- try {
- this.firstRunnable.run();
- } finally {
- if (logger.isTraceEnabled()) {
- logger.trace("beforeCompletion notification completed");
- }
- this.firstRunnableCompleted = true;
- this.firstRunnable = null;
- this.firstRunnableSync.notifyAll();
- }
- }
- synchronized (this.secondRunnableSync) {
- // TODO there should be a transaction timeout that keeps this thread
- // from sitting around forever if the client goes away
- final boolean isTraceEnabled = logger.isTraceEnabled();
- while (this.secondRunnable == null && !this.abort) {
- try {
- if (isTraceEnabled) {
- logger.trace("waiting for afterCompletion notification");
- }
- this.secondRunnableSync.wait(1000);
- } catch (InterruptedException ignore) {
- // eat the interrupt and check for exit conditions
- }
- }
- if (isTraceEnabled) {
- logger.trace("executing afterCompletion notification");
- }
- try {
- if (!this.abort) {
- this.secondRunnable.run();
- }
- } finally {
- if (isTraceEnabled) {
- logger.trace("afterCompletion notification completed");
- }
- this.secondRunnableCompleted = true;
- this.secondRunnable = null;
- this.secondRunnableSync.notifyAll();
- }
- }
- }
-
- /**
- * wait for the initial beforeCompletion step to finish
- */
- public void waitForFirstExecution() {
- synchronized (this.firstRunnableSync) {
- while (!this.firstRunnableCompleted) {
- try {
- this.firstRunnableSync.wait(1000);
- } catch (InterruptedException ignore) {
- // eat the interrupt and check for exit conditions
- }
- cancelCriterion.checkCancelInProgress(null);
- }
- }
- }
-
- /**
- * run the afterCompletion portion of synchronization. This method schedules execution of the
- * given runnable and then waits for it to finish running
- */
- public void runSecondRunnable(Runnable r) {
- synchronized (this.secondRunnableSync) {
- this.secondRunnable = r;
- this.secondRunnableSync.notifyAll();
- while (!this.secondRunnableCompleted && !this.abort) {
- try {
- this.secondRunnableSync.wait(1000);
- } catch (InterruptedException ignore) {
- // eat the interrupt and check for exit conditions
- }
- cancelCriterion.checkCancelInProgress(null);
- }
- }
- }
-
- /**
- * stop waiting for an afterCompletion to arrive and just exit
- */
- public void abort() {
- synchronized (this.secondRunnableSync) {
- this.abort = true;
- }
- }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
new file mode 100644
index 0000000..79f4324
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.transaction.Status;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.cache.TransactionException;
+
+public class SingleThreadJTAExecutorTest {
+ private TXState txState;
+ private SingleThreadJTAExecutor singleThreadJTAExecutor;
+ private ExecutorService executor;
+
+ @Before
+ public void setup() {
+ txState = mock(TXState.class, RETURNS_DEEP_STUBS);
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @Test
+ public void executeBeforeCompletionCallsDoBeforeCompletion() {
+ singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+
+ singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+
+ verify(txState, times(1)).doBeforeCompletion();
+
+ assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+ }
+
+ @Test(expected = SynchronizationCommitConflictException.class)
+ public void executeBeforeCompletionThrowsExceptionIfBeforeCompletionFailed() {
+ singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+ doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion();
+
+ singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+
+ verify(txState, times(1)).doBeforeCompletion();
+ assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+ }
+
+ @Test
+ public void executeAfterCompletionCallsDoAfterCompletion() {
+ singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+ int status = Status.STATUS_COMMITTED;
+
+ singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+ singleThreadJTAExecutor.executeAfterCompletion(txState, status);
+
+ verify(txState, times(1)).doBeforeCompletion();
+ verify(txState, times(1)).doAfterCompletion(status);
+ assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+ }
+
+ @Test
+ public void executeAfterCompletionThrowsExceptionIfAfterCompletionFailed() {
+ singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+ int status = Status.STATUS_COMMITTED;
+ TransactionException exception = new TransactionException("");
+ doThrow(exception).when(txState).doAfterCompletion(status);
+
+ singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+
+ assertThatThrownBy(() -> singleThreadJTAExecutor.executeAfterCompletion(txState, status))
+ .isSameAs(exception);
+ verify(txState, times(1)).doBeforeCompletion();
+ verify(txState, times(1)).doAfterCompletion(status);
+ }
+
+ @Test
+ public void executorThreadNoLongerWaitForAfterCompletionIfTXStateIsCleanedUp() {
+ singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+
+ singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+ singleThreadJTAExecutor.cleanup(txState);
+
+ verify(txState, times(1)).doBeforeCompletion();
+ assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+ assertThat(singleThreadJTAExecutor.isAfterCompletionFinished()).isTrue();
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 5ec4cbd..c1e9acf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -26,16 +27,17 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.concurrent.Executor;
+
import javax.transaction.Status;
import org.junit.Before;
import org.junit.Test;
import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.FailedSynchronizationException;
import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
-import org.apache.geode.cache.TransactionException;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
public class TXStateTest {
private TXStateProxyImpl txStateProxy;
@@ -44,60 +46,54 @@ public class TXStateTest {
@Before
public void setup() {
- txStateProxy = mock(TXStateProxyImpl.class);
+ txStateProxy = mock(TXStateProxyImpl.class, RETURNS_DEEP_STUBS);
exception = new CommitConflictException("");
transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException("");
when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
}
-
@Test
- public void beforeCompletionThrowsIfReserveAndCheckFails() {
+ public void doBeforeCompletionThrowsIfReserveAndCheckFails() {
TXState txState = spy(new TXState(txStateProxy, true));
+ doReturn(mock(Executor.class)).when(txState).getExecutor();
doThrow(exception).when(txState).reserveAndCheck();
- assertThatThrownBy(() -> txState.beforeCompletion())
+ assertThatThrownBy(() -> txState.doBeforeCompletion())
.isInstanceOf(SynchronizationCommitConflictException.class);
}
-
@Test
- public void afterCompletionThrowsIfCommitFails() {
+ public void doAfterCompletionThrowsIfCommitFails() {
TXState txState = spy(new TXState(txStateProxy, true));
doReturn(mock(InternalCache.class)).when(txState).getCache();
doReturn(true).when(txState).wasBeforeCompletionCalled();
txState.reserveAndCheck();
doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
- assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+ assertThatThrownBy(() -> txState.doAfterCompletion(Status.STATUS_COMMITTED))
.isSameAs(transactionDataNodeHasDepartedException);
}
@Test
- public void afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException() {
- TXState txState = spy(new TXState(txStateProxy, true));
- doReturn(mock(InternalCache.class)).when(txState).getCache();
- doReturn(true).when(txState).wasBeforeCompletionCalled();
- doThrow(exception).when(txState).commit();
-
- assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
- .isInstanceOf(TransactionException.class);
- }
-
- @Test
- public void afterCompletionCanCommitJTA() {
+ public void doAfterCompletionCanCommitJTA() {
TXState txState = spy(new TXState(txStateProxy, false));
doReturn(mock(InternalCache.class)).when(txState).getCache();
txState.reserveAndCheck();
txState.closed = true;
doReturn(true).when(txState).wasBeforeCompletionCalled();
- txState.afterCompletion(Status.STATUS_COMMITTED);
+ txState.doAfterCompletion(Status.STATUS_COMMITTED);
assertThat(txState.locks).isNull();
verify(txState, times(1)).saveTXCommitMessageForClientFailover();
}
+ @Test(expected = FailedSynchronizationException.class)
+ public void afterCompletionThrowsExceptionIfBeforeCompletionNotCalled() {
+ TXState txState = new TXState(txStateProxy, true);
+ txState.afterCompletion(Status.STATUS_COMMITTED);
+ }
+
@Test
public void afterCompletionCanRollbackJTA() {
TXState txState = spy(new TXState(txStateProxy, true));
@@ -153,16 +149,7 @@ public class TXStateTest {
public void getOriginatingMemberReturnsNullIfNotOriginatedFromClient() {
TXState txState = spy(new TXState(txStateProxy, false));
- assertThat(txState.getOriginatingMember()).isNull();
+ assertThat(txState.getOriginatingMember()).isSameAs(txStateProxy.getOnBehalfOfClientMember());
}
- @Test
- public void getOriginatingMemberReturnsClientMemberIfOriginatedFromClient() {
- InternalDistributedMember client = mock(InternalDistributedMember.class);
- TXStateProxyImpl proxy = new TXStateProxyImpl(mock(InternalCache.class),
- mock(TXManagerImpl.class), mock(TXId.class), client);
- TXState txState = spy(new TXState(proxy, false));
-
- assertThat(txState.getOriginatingMember()).isEqualTo(client);
- }
}