You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/08/28 19:30:01 UTC
[geode] branch develop updated: Feature/geode 5624 Use a single
thread to ensure beforeCompletion and afterCompletion are executed by the
same thread. (#2388)
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 926f35e Feature/geode 5624 Use a single thread to ensure beforeCompletion and afterCompletion are executed by the same thread. (#2388)
926f35e is described below
commit 926f35eb1d20c3b4bbe26472aea6ad74048dc8aa
Author: pivotal-eshu <es...@pivotal.io>
AuthorDate: Tue Aug 28 12:29:56 2018 -0700
Feature/geode 5624 Use a single thread to ensure beforeCompletion and afterCompletion are executed by the same thread. (#2388)
---
.../ClientServerJTAFailoverDistributedTest.java | 37 +++++-
.../geode/internal/cache/AfterCompletion.java | 100 ++++++++++++++++
.../geode/internal/cache/BeforeCompletion.java | 68 +++++++++++
.../internal/cache/SingleThreadJTAExecutor.java | 71 +++++++++++
.../org/apache/geode/internal/cache/TXState.java | 56 +++++++--
.../geode/internal/cache/AfterCompletionTest.java | 131 +++++++++++++++++++++
.../geode/internal/cache/BeforeCompletionTest.java | 98 +++++++++++++++
.../cache/SingleThreadJTAExecutorTest.java | 66 +++++++++++
.../apache/geode/internal/cache/TXStateTest.java | 50 +++-----
9 files changed, 632 insertions(+), 45 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
index cb6be25..5d60b6b 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.PoolFactory;
@@ -65,12 +66,14 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
private String hostName;
private String uniqueName;
private String regionName;
+ private String replicateRegionName;
private VM server1;
private VM server2;
private VM server3;
private VM client1;
private int port1;
private int port2;
+ private boolean hasReplicateRegion = false;
@Rule
public DistributedRule distributedRule = new DistributedRule();
@@ -94,6 +97,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
hostName = getHostName();
uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
regionName = uniqueName + "_region";
+ replicateRegionName = uniqueName + "_replicate_region";
}
@Test
@@ -130,6 +134,12 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
regionFactory.create(regionName);
+ if (hasReplicateRegion) {
+ cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE)
+ .create(replicateRegionName);
+ }
+
+
CacheServer server = cacheRule.getCache().addCacheServer();
server.setPort(0);
server.start();
@@ -152,6 +162,10 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
clientRegionFactory.setPoolName(pool.getName());
clientRegionFactory.create(regionName);
+ if (hasReplicateRegion) {
+ clientRegionFactory.create(replicateRegionName);
+ }
+
if (ports.length > 1) {
pool.acquireConnection(new ServerLocation(hostName, port1));
}
@@ -174,10 +188,13 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
Object[] results = new Object[2];
InternalClientCache cache = clientCacheRule.getClientCache();
Region region = cache.getRegion(regionName);
+ Region replicateRegion = hasReplicateRegion ? cache.getRegion(replicateRegionName) : null;
TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
txManager.begin();
region.put(key, newValue);
-
+ if (hasReplicateRegion) {
+ replicateRegion.put(key, newValue);
+ }
TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null);
clientTXStateStub.beforeCompletion();
@@ -191,6 +208,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
private void doAfterCompletion(TransactionId transactionId, boolean isCommit) {
InternalClientCache cache = clientCacheRule.getClientCache();
Region region = cache.getRegion(regionName);
+ Region replicateRegion = cache.getRegion(replicateRegionName);
TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
txManager.resume(transactionId);
@@ -205,6 +223,9 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
}
if (isCommit) {
assertEquals(newValue, region.get(key));
+ if (hasReplicateRegion) {
+ assertEquals(newValue, replicateRegion.get(key));
+ }
} else {
assertEquals(value, region.get(key));
}
@@ -295,4 +316,18 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
txStateStub.beforeCompletion();
}
+ @Test
+ public void jtaCanFailoverToJTAHostForMixedRegionsAfterDoneBeforeCompletion() {
+ hasReplicateRegion = true;
+ port2 = server2.invoke(() -> createServerRegion(1, false));
+ server2.invoke(() -> doPut(key, value));
+ port1 = server1.invoke(() -> createServerRegion(1, true));
+
+ client1.invoke(() -> createClientRegion(port1, port2));
+ Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion());
+
+ server1.invoke(() -> cacheRule.getCache().close());
+
+ client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], true));
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
new file mode 100644
index 0000000..028e067
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.function.BooleanSupplier;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.internal.logging.LogService;
+
+public class AfterCompletion {
+ private static final Logger logger = LogService.getLogger();
+
+ private boolean started;
+ private boolean finished;
+ private int status = -1;
+ private boolean cancelled;
+ private RuntimeException exception;
+
+ public synchronized void doOp(TXState txState, CancelCriterion cancelCriterion) {
+ // there should be a transaction timeout that keeps this thread
+ // from sitting around forever if the client goes away
+ // The above was done by setting afterCompletionCancelled in txState
+ // during cleanup. When client departed, the transaction/JTA
+ // will be timed out and cleanup code will be executed.
+ waitForExecuteOrCancel(cancelCriterion);
+ started = true;
+ logger.debug("executing afterCompletion notification");
+
+ try {
+ if (cancelled) {
+ txState.doCleanup();
+ } else {
+ txState.doAfterCompletion(status);
+ }
+ } catch (RuntimeException exception) {
+ this.exception = exception;
+ } finally {
+ logger.debug("afterCompletion notification completed");
+ finished = true;
+ notifyAll();
+ }
+ }
+
+ private void waitForExecuteOrCancel(CancelCriterion cancelCriterion) {
+ waitForCondition(cancelCriterion, () -> status != -1 || cancelled);
+ }
+
+ private synchronized void waitForCondition(CancelCriterion cancelCriterion,
+ BooleanSupplier condition) {
+ while (!condition.getAsBoolean()) {
+ cancelCriterion.checkCancelInProgress(null);
+ try {
+ logger.debug("waiting for notification");
+ wait(1000);
+ } catch (InterruptedException ignore) {
+ // eat the interrupt and check for exit conditions
+ }
+ }
+ }
+
+ public synchronized void execute(CancelCriterion cancelCriterion, int status) {
+ this.status = status;
+ signalAndWaitForDoOp(cancelCriterion);
+ }
+
+ private void signalAndWaitForDoOp(CancelCriterion cancelCriterion) {
+ notifyAll();
+ waitUntilFinished(cancelCriterion);
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ private void waitUntilFinished(CancelCriterion cancelCriterion) {
+ waitForCondition(cancelCriterion, () -> finished);
+ }
+
+ public synchronized void cancel(CancelCriterion cancelCriterion) {
+ cancelled = true;
+ signalAndWaitForDoOp(cancelCriterion);
+ }
+
+ public synchronized boolean isStarted() {
+ return started;
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
new file mode 100644
index 0000000..247a0f2
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.internal.logging.LogService;
+
+public class BeforeCompletion {
+ private static final Logger logger = LogService.getLogger();
+
+ private boolean started;
+ private boolean finished;
+ private SynchronizationCommitConflictException exception;
+
+ public synchronized void doOp(TXState txState) {
+ try {
+ txState.doBeforeCompletion();
+ } catch (SynchronizationCommitConflictException exception) {
+ this.exception = exception;
+ } finally {
+ logger.debug("beforeCompletion notification completed");
+ finished = true;
+ notifyAll();
+ }
+ }
+
+ public synchronized void execute(CancelCriterion cancelCriterion) {
+ started = true;
+ waitUntilFinished(cancelCriterion);
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ private void waitUntilFinished(CancelCriterion cancelCriterion) {
+ while (!finished) {
+ cancelCriterion.checkCancelInProgress(null);
+ try {
+ wait(1000);
+ } catch (InterruptedException ignore) {
+ // eat the interrupt and check for exit conditions
+ }
+ }
+ }
+
+ public synchronized boolean isStarted() {
+ return started;
+ }
+
+ public synchronized boolean isFinished() {
+ return finished;
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
new file mode 100644
index 0000000..636def9
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.concurrent.Executor;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * This class ensures that beforeCompletion and afterCompletion are executed in the same thread.
+ *
+ * @since Geode 1.7.0
+ */
+public class SingleThreadJTAExecutor {
+ private static final Logger logger = LogService.getLogger();
+
+ private final BeforeCompletion beforeCompletion;
+ private final AfterCompletion afterCompletion;
+
+ public SingleThreadJTAExecutor() {
+ this(new BeforeCompletion(), new AfterCompletion());
+ }
+
+ public SingleThreadJTAExecutor(BeforeCompletion beforeCompletion,
+ AfterCompletion afterCompletion) {
+ this.beforeCompletion = beforeCompletion;
+ this.afterCompletion = afterCompletion;
+ }
+
+ private void doOps(TXState txState, CancelCriterion cancelCriterion) {
+ beforeCompletion.doOp(txState);
+ afterCompletion.doOp(txState, cancelCriterion);
+ }
+
+ public void executeBeforeCompletion(TXState txState, Executor executor,
+ CancelCriterion cancelCriterion) {
+ executor.execute(() -> doOps(txState, cancelCriterion));
+
+ beforeCompletion.execute(cancelCriterion);
+ }
+
+ public void executeAfterCompletion(CancelCriterion cancelCriterion, int status) {
+ afterCompletion.execute(cancelCriterion, status);
+ }
+
+ /**
+ * stop waiting for an afterCompletion to arrive and just exit
+ */
+ public void cleanup(CancelCriterion cancelCriterion) {
+ afterCompletion.cancel(cancelCriterion);
+ }
+
+ public boolean shouldDoCleanup() {
+ return beforeCompletion.isFinished() && !afterCompletion.isStarted();
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 9494fd3..5263e2e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -24,12 +24,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.Status;
import org.apache.logging.log4j.Logger;
+import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
@@ -101,6 +103,13 @@ public class TXState implements TXStateInterface {
// Access this variable should be in synchronized block.
private boolean beforeCompletionCalled;
+ /**
+ * for client/server JTA transactions we need to have a single thread handle both beforeCompletion
+ * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step.
+ * This is that thread
+ */
+ private final SingleThreadJTAExecutor singleThreadJTAExecutor;
+
// Internal testing hooks
private Runnable internalAfterReservation;
protected Runnable internalAfterConflictCheck;
@@ -142,6 +151,11 @@ public class TXState implements TXStateInterface {
private volatile DistributedMember proxyServer;
public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub) {
+ this(proxy, onBehalfOfRemoteStub, new SingleThreadJTAExecutor());
+ }
+
+ public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub,
+ SingleThreadJTAExecutor singleThreadJTAExecutor) {
this.beginTime = CachePerfStats.getStatTime();
this.regions = new IdentityHashMap<>();
@@ -154,7 +168,7 @@ public class TXState implements TXStateInterface {
this.internalAfterSend = null;
this.proxy = proxy;
this.onBehalfOfRemoteStub = onBehalfOfRemoteStub;
-
+ this.singleThreadJTAExecutor = singleThreadJTAExecutor;
}
private boolean hasSeenEvent(EntryEventImpl event) {
@@ -417,7 +431,7 @@ public class TXState implements TXStateInterface {
}
/*
- * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
+ * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup
* the transaction.
*/
TransactionWriter writer = this.proxy.getTxMgr().getWriter();
@@ -857,6 +871,14 @@ public class TXState implements TXStateInterface {
}
protected void cleanup() {
+ if (singleThreadJTAExecutor.shouldDoCleanup()) {
+ singleThreadJTAExecutor.cleanup(getCancelCriterion());
+ } else {
+ doCleanup();
+ }
+ }
+
+ void doCleanup() {
IllegalArgumentException iae = null;
try {
this.closed = true;
@@ -910,6 +932,7 @@ public class TXState implements TXStateInterface {
synchronized (this.completionGuard) {
this.completionGuard.notifyAll();
}
+
if (iae != null && !this.proxy.getCache().isClosed()) {
throw iae;
}
@@ -1006,24 +1029,36 @@ public class TXState implements TXStateInterface {
*/
@Override
public synchronized void beforeCompletion() throws SynchronizationCommitConflictException {
+ proxy.getTxMgr().setTXState(null);
if (this.closed) {
throw new TXManagerCancelledException();
}
+
if (beforeCompletionCalled) {
// do not re-execute beforeCompletion again
return;
}
beforeCompletionCalled = true;
- doBeforeCompletion();
+ singleThreadJTAExecutor.executeBeforeCompletion(this,
+ getExecutor(), getCancelCriterion());
}
- private void doBeforeCompletion() {
+ private Executor getExecutor() {
+ return getCache().getDistributionManager().getWaitingThreadPool();
+ }
+
+ private CancelCriterion getCancelCriterion() {
+ return getCache().getCancelCriterion();
+ }
+
+ void doBeforeCompletion() {
final long opStart = CachePerfStats.getStatTime();
this.jtaLifeTime = opStart - getBeginTime();
+
try {
reserveAndCheck();
/*
- * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
+ * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup
* the transaction.
*/
TransactionWriter writer = this.proxy.getTxMgr().getWriter();
@@ -1072,11 +1107,12 @@ public class TXState implements TXStateInterface {
*/
@Override
public synchronized void afterCompletion(int status) {
- this.proxy.getTxMgr().setTXState(null);
- // For commit, beforeCompletion should be called. Otherwise
+ proxy.getTxMgr().setTXState(null);
+ // if there was a beforeCompletion call then there will be a thread
+ // sitting in the waiting pool to execute afterCompletion. Otherwise
// throw FailedSynchronizationException().
- if (wasBeforeCompletionCalled()) {
- doAfterCompletion(status);
+ if (beforeCompletionCalled) {
+ singleThreadJTAExecutor.executeAfterCompletion(getCancelCriterion(), status);
} else {
// rollback does not run beforeCompletion.
if (status != Status.STATUS_ROLLEDBACK) {
@@ -1087,7 +1123,7 @@ public class TXState implements TXStateInterface {
}
}
- private void doAfterCompletion(int status) {
+ void doAfterCompletion(int status) {
final long opStart = CachePerfStats.getStatTime();
try {
switch (status) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
new file mode 100644
index 0000000..d94df0e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.Status;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+
+public class AfterCompletionTest {
+ private AfterCompletion afterCompletion;
+ private CancelCriterion cancelCriterion;
+ private TXState txState;
+ private Thread doOpThread;
+
+ @Before
+ public void setup() {
+ afterCompletion = new AfterCompletion();
+ cancelCriterion = mock(CancelCriterion.class);
+ txState = mock(TXState.class);
+ }
+
+ @Test
+ public void executeThrowsIfCancelCriterionThrows() {
+ doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+ assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED))
+ .isInstanceOf(RuntimeException.class);
+ }
+
+ @Test
+ public void cancelThrowsIfCancelCriterionThrows() {
+ doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+ assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion))
+ .isInstanceOf(RuntimeException.class);
+ }
+
+ @Test
+ public void isStartedReturnsFalseIfNotExecuted() {
+ assertThat(afterCompletion.isStarted()).isFalse();
+ }
+
+ @Test
+ public void isStartedReturnsTrueIfExecuted() {
+ startDoOp();
+
+ afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED);
+
+ verifyDoOpFinished();
+ assertThat(afterCompletion.isStarted()).isTrue();
+ }
+
+ @Test
+ public void executeCallsDoAfterCompletion() {
+ startDoOp();
+
+ afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED);
+ verifyDoOpFinished();
+ verify(txState, times(1)).doAfterCompletion(eq(Status.STATUS_COMMITTED));
+ }
+
+ @Test
+ public void executeThrowsDoAfterCompletionThrows() {
+ startDoOp();
+ doThrow(new RuntimeException()).when(txState).doAfterCompletion(Status.STATUS_COMMITTED);
+
+ assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED))
+ .isInstanceOf(RuntimeException.class);
+
+ verifyDoOpFinished();
+ }
+
+ @Test
+ public void cancelCallsDoCleanup() {
+ startDoOp();
+
+ afterCompletion.cancel(cancelCriterion);
+ verifyDoOpFinished();
+ verify(txState, times(1)).doCleanup();
+ }
+
+ @Test
+ public void cancelThrowsDoCleanupThrows() {
+ startDoOp();
+ doThrow(new RuntimeException()).when(txState).doCleanup();
+
+ assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion))
+ .isInstanceOf(RuntimeException.class);
+
+ verifyDoOpFinished();
+ }
+
+ private void startDoOp() {
+ doOpThread = new Thread(() -> afterCompletion.doOp(txState, cancelCriterion));
+ doOpThread.start();
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null));
+
+ }
+
+ private void verifyDoOpFinished() {
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !doOpThread.isAlive());
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
new file mode 100644
index 0000000..1f541b6
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+
+public class BeforeCompletionTest {
+
+ private BeforeCompletion beforeCompletion;
+ private CancelCriterion cancelCriterion;
+ private TXState txState;
+
+ @Before
+ public void setup() {
+ beforeCompletion = new BeforeCompletion();
+ cancelCriterion = mock(CancelCriterion.class);
+ txState = mock(TXState.class);
+ }
+
+ @Test
+ public void executeThrowsExceptionIfDoOpFailedWithException() {
+ doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion();
+
+ beforeCompletion.doOp(txState);
+
+ assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion))
+ .isInstanceOf(SynchronizationCommitConflictException.class);
+ }
+
+ @Test
+ public void doOpCallsDoBeforeCompletion() {
+ beforeCompletion.doOp(txState);
+
+ verify(txState, times(1)).doBeforeCompletion();
+ }
+
+ @Test
+ public void isStartedReturnsFalseIfNotExecuted() {
+ assertThat(beforeCompletion.isStarted()).isFalse();
+ }
+
+ @Test
+ public void isStartedReturnsTrueIfExecuted() {
+ beforeCompletion.doOp(txState);
+ beforeCompletion.execute(cancelCriterion);
+
+ assertThat(beforeCompletion.isStarted()).isTrue();
+ }
+
+ @Test
+ public void executeThrowsIfCancelCriterionThrows() {
+ doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+ assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion))
+ .isInstanceOf(RuntimeException.class);
+ }
+
+ @Test
+ public void executeWaitsUntilDoOpFinish() throws Exception {
+ Thread thread = new Thread(() -> beforeCompletion.execute(cancelCriterion));
+ thread.start();
+ // give the thread a chance to get past the "finished" check by waiting until
+ // checkCancelInProgress is called
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null));
+
+ beforeCompletion.doOp(txState);
+
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !(thread.isAlive()));
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
new file mode 100644
index 0000000..bc48eb4
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+import org.apache.geode.CancelCriterion;
+
+public class SingleThreadJTAExecutorTest {
+ private SingleThreadJTAExecutor singleThreadJTAExecutor;
+ private TXState txState;
+ private ExecutorService executor;
+ private BeforeCompletion beforeCompletion;
+ private AfterCompletion afterCompletion;
+ private CancelCriterion cancelCriterion;
+
+ @Before
+ public void setup() {
+ txState = mock(TXState.class, RETURNS_DEEP_STUBS);
+ executor = Executors.newSingleThreadExecutor();
+ beforeCompletion = mock(BeforeCompletion.class);
+ afterCompletion = mock(AfterCompletion.class);
+ cancelCriterion = mock(CancelCriterion.class);
+ singleThreadJTAExecutor = new SingleThreadJTAExecutor(beforeCompletion, afterCompletion);
+ }
+
+ @Test
+ public void executeBeforeCompletionCallsDoOps() {
+ InOrder inOrder = inOrder(beforeCompletion, afterCompletion);
+
+ singleThreadJTAExecutor.executeBeforeCompletion(txState, executor, cancelCriterion);
+
+ verify(beforeCompletion, times(1)).execute(eq(cancelCriterion));
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
+ .untilAsserted(() -> inOrder.verify(beforeCompletion, times(1)).doOp(eq(txState)));
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
+ () -> inOrder.verify(afterCompletion, times(1)).doOp(eq(txState), eq(cancelCriterion)));
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 5ec4cbd..21fadf0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -32,10 +33,9 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.FailedSynchronizationException;
import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
-import org.apache.geode.cache.TransactionException;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
public class TXStateTest {
private TXStateProxyImpl txStateProxy;
@@ -44,60 +44,51 @@ public class TXStateTest {
@Before
public void setup() {
- txStateProxy = mock(TXStateProxyImpl.class);
+ txStateProxy = mock(TXStateProxyImpl.class, RETURNS_DEEP_STUBS);
exception = new CommitConflictException("");
transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException("");
when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
}
-
@Test
- public void beforeCompletionThrowsIfReserveAndCheckFails() {
+ public void doBeforeCompletionThrowsIfReserveAndCheckFails() {
TXState txState = spy(new TXState(txStateProxy, true));
doThrow(exception).when(txState).reserveAndCheck();
- assertThatThrownBy(() -> txState.beforeCompletion())
+ assertThatThrownBy(() -> txState.doBeforeCompletion())
.isInstanceOf(SynchronizationCommitConflictException.class);
}
-
@Test
- public void afterCompletionThrowsIfCommitFails() {
+ public void doAfterCompletionThrowsIfCommitFails() {
TXState txState = spy(new TXState(txStateProxy, true));
- doReturn(mock(InternalCache.class)).when(txState).getCache();
doReturn(true).when(txState).wasBeforeCompletionCalled();
txState.reserveAndCheck();
doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
- assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+ assertThatThrownBy(() -> txState.doAfterCompletion(Status.STATUS_COMMITTED))
.isSameAs(transactionDataNodeHasDepartedException);
}
@Test
- public void afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException() {
- TXState txState = spy(new TXState(txStateProxy, true));
- doReturn(mock(InternalCache.class)).when(txState).getCache();
- doReturn(true).when(txState).wasBeforeCompletionCalled();
- doThrow(exception).when(txState).commit();
-
- assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
- .isInstanceOf(TransactionException.class);
- }
-
- @Test
- public void afterCompletionCanCommitJTA() {
+ public void doAfterCompletionCanCommitJTA() {
TXState txState = spy(new TXState(txStateProxy, false));
- doReturn(mock(InternalCache.class)).when(txState).getCache();
txState.reserveAndCheck();
txState.closed = true;
doReturn(true).when(txState).wasBeforeCompletionCalled();
- txState.afterCompletion(Status.STATUS_COMMITTED);
+ txState.doAfterCompletion(Status.STATUS_COMMITTED);
assertThat(txState.locks).isNull();
verify(txState, times(1)).saveTXCommitMessageForClientFailover();
}
+ @Test(expected = FailedSynchronizationException.class)
+ public void afterCompletionThrowsExceptionIfBeforeCompletionNotCalled() {
+ TXState txState = new TXState(txStateProxy, true);
+ txState.afterCompletion(Status.STATUS_COMMITTED);
+ }
+
@Test
public void afterCompletionCanRollbackJTA() {
TXState txState = spy(new TXState(txStateProxy, true));
@@ -153,16 +144,7 @@ public class TXStateTest {
public void getOriginatingMemberReturnsNullIfNotOriginatedFromClient() {
TXState txState = spy(new TXState(txStateProxy, false));
- assertThat(txState.getOriginatingMember()).isNull();
+ assertThat(txState.getOriginatingMember()).isSameAs(txStateProxy.getOnBehalfOfClientMember());
}
- @Test
- public void getOriginatingMemberReturnsClientMemberIfOriginatedFromClient() {
- InternalDistributedMember client = mock(InternalDistributedMember.class);
- TXStateProxyImpl proxy = new TXStateProxyImpl(mock(InternalCache.class),
- mock(TXManagerImpl.class), mock(TXId.class), client);
- TXState txState = spy(new TXState(proxy, false));
-
- assertThat(txState.getOriginatingMember()).isEqualTo(client);
- }
}