You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/08/27 20:11:40 UTC
[geode] branch feature/GEODE-5624 updated: more refactoring.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-5624
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-5624 by this push:
new 13b6cfe more refactoring.
13b6cfe is described below
commit 13b6cfe2cf2e1709a12d47e1dce7030af57e89f2
Author: eshu <es...@pivotal.io>
AuthorDate: Mon Aug 27 13:11:04 2018 -0700
more refactoring.
---
.../geode/internal/cache/AfterCompletion.java | 105 +++++++++++++
.../geode/internal/cache/BeforeCompletion.java | 64 ++++++++
.../internal/cache/SingleThreadJTAExecutor.java | 172 +++------------------
.../org/apache/geode/internal/cache/TXState.java | 11 +-
.../geode/internal/cache/AfterCompletionTest.java | 131 ++++++++++++++++
.../geode/internal/cache/BeforeCompletionTest.java | 98 ++++++++++++
.../cache/SingleThreadJTAExecutorTest.java | 84 +++-------
7 files changed, 447 insertions(+), 218 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
new file mode 100644
index 0000000..02ef92c
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.function.BooleanSupplier;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.internal.logging.LogService;
+
+public class AfterCompletion {
+ private static final Logger logger = LogService.getLogger();
+
+ private boolean started;
+ private boolean finished;
+ private int status = -1;
+ private boolean cancelled;
+ private RuntimeException exception;
+
+ public synchronized void doOp(TXState txState, CancelCriterion cancelCriterion) {
+ // there should be a transaction timeout that keeps this thread
+ // from sitting around forever if the client goes away
+ // The above was done by setting afterCompletionCancelled in txState
+ // during cleanup. When client departed, the transaction/JTA
+ // will be timed out and cleanup code will be executed.
+ waitForExecuteOrCancel(cancelCriterion);
+ started = true;
+ logger.debug("executing afterCompletion notification");
+
+ try {
+ if (cancelled) {
+ txState.doCleanup();
+ } else {
+ txState.doAfterCompletion(status);
+ }
+ } catch (RuntimeException exception) {
+ this.exception = exception;
+ } finally {
+ logger.debug("afterCompletion notification completed");
+ finished = true;
+ notifyAll();
+ }
+ }
+
+ private void waitForExecuteOrCancel(CancelCriterion cancelCriterion) {
+ waitForCondition(cancelCriterion, () -> {
+ return (status == -1 && !cancelled);
+ });
+ }
+
+ private synchronized void waitForCondition(CancelCriterion cancelCriterion,
+ BooleanSupplier condition) {
+ while (condition.getAsBoolean()) {
+ cancelCriterion.checkCancelInProgress(null);
+ try {
+ logger.debug("waiting for notification");
+ wait(1000);
+ } catch (InterruptedException ignore) {
+ // eat the interrupt and check for exit conditions
+ }
+ }
+ }
+
+ public synchronized void execute(CancelCriterion cancelCriterion, int status) {
+ this.status = status;
+ signalAndWaitForDoOp(cancelCriterion);
+ }
+
+ private void signalAndWaitForDoOp(CancelCriterion cancelCriterion) {
+ notifyAll();
+ waitUntilFinished(cancelCriterion);
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ private void waitUntilFinished(CancelCriterion cancelCriterion) {
+ waitForCondition(cancelCriterion, () -> {
+ return !finished;
+ });
+
+ }
+
+ public synchronized void cancel(CancelCriterion cancelCriterion) {
+ cancelled = true;
+ signalAndWaitForDoOp(cancelCriterion);
+ }
+
+ public synchronized boolean isStarted() {
+ return started;
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
new file mode 100644
index 0000000..99a062c
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.internal.logging.LogService;
+
+public class BeforeCompletion {
+ private static final Logger logger = LogService.getLogger();
+
+ private boolean started;
+ private boolean finished;
+ private SynchronizationCommitConflictException exception;
+
+ public synchronized void doOp(TXState txState) {
+ try {
+ txState.doBeforeCompletion();
+ } catch (SynchronizationCommitConflictException exception) {
+ this.exception = exception;
+ } finally {
+ logger.debug("beforeCompletion notification completed");
+ finished = true;
+ notifyAll();
+ }
+ }
+
+ public synchronized void execute(CancelCriterion cancelCriterion) {
+ started = true;
+ waitUntilFinished(cancelCriterion);
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ private void waitUntilFinished(CancelCriterion cancelCriterion) {
+ while (!finished) {
+ cancelCriterion.checkCancelInProgress(null);
+ try {
+ wait(1000);
+ } catch (InterruptedException ignore) {
+ // eat the interrupt and check for exit conditions
+ }
+ }
+ }
+
+ public synchronized boolean isStarted() {
+ return started;
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
index 4df8ca4..7ecca6a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
@@ -18,180 +18,54 @@ import java.util.concurrent.Executor;
import org.apache.logging.log4j.Logger;
-import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.CancelCriterion;
import org.apache.geode.internal.logging.LogService;
/**
- * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls.
- * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
- * Then you must invoke executeAfterCompletion() with another Runnable that invokes afterCompletion
- * behavior.
+ * This class ensures that beforeCompletion and afterCompletion are executed in the same thread.
*
- * @since Geode 1.6.0
+ * @since Geode 1.7.0
*/
public class SingleThreadJTAExecutor {
private static final Logger logger = LogService.getLogger();
- private final Object beforeCompletionSync = new Object();
- private boolean beforeCompletionStarted;
- private boolean beforeCompletionFinished;
- private SynchronizationCommitConflictException beforeCompletionException;
+ private final BeforeCompletion beforeCompletion;
+ private final AfterCompletion afterCompletion;
- private final Object afterCompletionSync = new Object();
- private boolean afterCompletionStarted;
- private boolean afterCompletionFinished;
- private int afterCompletionStatus = -1;
- private boolean afterCompletionCancelled;
- private RuntimeException afterCompletionException;
-
- public SingleThreadJTAExecutor() {}
-
- void doOps(TXState txState) {
- doBeforeCompletionOp(txState);
- doAfterCompletionOp(txState);
- }
-
- void doBeforeCompletionOp(TXState txState) {
- synchronized (beforeCompletionSync) {
- try {
- txState.doBeforeCompletion();
- } catch (SynchronizationCommitConflictException exception) {
- beforeCompletionException = exception;
- } finally {
- if (logger.isDebugEnabled()) {
- logger.debug("beforeCompletion notification completed");
- }
- beforeCompletionFinished = true;
- beforeCompletionSync.notifyAll();
- }
- }
- }
-
- boolean isBeforeCompletionStarted() {
- synchronized (beforeCompletionSync) {
- return beforeCompletionStarted;
- }
- }
-
- boolean isAfterCompletionStarted() {
- synchronized (afterCompletionSync) {
- return afterCompletionStarted;
- }
+ public SingleThreadJTAExecutor() {
+ this(new BeforeCompletion(), new AfterCompletion());
}
- boolean isBeforeCompletionFinished() {
- synchronized (beforeCompletionSync) {
- return beforeCompletionFinished;
- }
+ public SingleThreadJTAExecutor(BeforeCompletion beforeCompletion,
+ AfterCompletion afterCompletion) {
+ this.beforeCompletion = beforeCompletion;
+ this.afterCompletion = afterCompletion;
}
- boolean isAfterCompletionFinished() {
- synchronized (afterCompletionSync) {
- return afterCompletionFinished;
- }
+ private void doOps(TXState txState, CancelCriterion cancelCriterion) {
+ beforeCompletion.doOp(txState);
+ afterCompletion.doOp(txState, cancelCriterion);
}
- public void executeBeforeCompletion(TXState txState, Executor executor) {
- executor.execute(() -> doOps(txState));
-
- synchronized (beforeCompletionSync) {
- beforeCompletionStarted = true;
- while (!beforeCompletionFinished) {
- try {
- beforeCompletionSync.wait(1000);
- } catch (InterruptedException ignore) {
- // eat the interrupt and check for exit conditions
- }
- txState.getCache().getCancelCriterion().checkCancelInProgress(null);
- }
- if (getBeforeCompletionException() != null) {
- throw getBeforeCompletionException();
- }
- }
- }
-
- SynchronizationCommitConflictException getBeforeCompletionException() {
- return beforeCompletionException;
- }
-
- private void doAfterCompletionOp(TXState txState) {
- synchronized (afterCompletionSync) {
- // there should be a transaction timeout that keeps this thread
- // from sitting around forever if the client goes away
- // The above was done by setting afterCompletionCancelled in txState
- // during cleanup. When client departed, the transaction/JTA
- // will be timed out and cleanup code will be executed.
- final boolean isDebugEnabled = logger.isDebugEnabled();
- while (afterCompletionStatus == -1 && !afterCompletionCancelled) {
- try {
- if (isDebugEnabled) {
- logger.debug("waiting for afterCompletion notification");
- }
- afterCompletionSync.wait(1000);
- } catch (InterruptedException ignore) {
- // eat the interrupt and check for exit conditions
- }
- }
- afterCompletionStarted = true;
- if (isDebugEnabled) {
- logger.debug("executing afterCompletion notification");
- }
- try {
- if (!afterCompletionCancelled) {
- txState.doAfterCompletion(afterCompletionStatus);
- } else {
- txState.doCleanup();
- }
- } catch (RuntimeException exception) {
- afterCompletionException = exception;
- } finally {
- if (isDebugEnabled) {
- logger.debug("afterCompletion notification completed");
- }
- afterCompletionFinished = true;
- afterCompletionSync.notifyAll();
- }
- }
- }
-
- public void executeAfterCompletion(TXState txState, int status) {
- synchronized (afterCompletionSync) {
- afterCompletionStatus = status;
- afterCompletionSync.notifyAll();
- waitForAfterCompletionToFinish(txState);
- if (getAfterCompletionException() != null) {
- throw getAfterCompletionException();
- }
- }
- }
+ public void executeBeforeCompletion(TXState txState, Executor executor,
+ CancelCriterion cancelCriterion) {
+ executor.execute(() -> doOps(txState, cancelCriterion));
- private void waitForAfterCompletionToFinish(TXState txState) {
- while (!afterCompletionFinished) {
- try {
- afterCompletionSync.wait(1000);
- } catch (InterruptedException ignore) {
- // eat the interrupt and check for exit conditions
- }
- txState.getCache().getCancelCriterion().checkCancelInProgress(null);
- }
+ beforeCompletion.execute(cancelCriterion);
}
- RuntimeException getAfterCompletionException() {
- return afterCompletionException;
+ public void executeAfterCompletion(CancelCriterion cancelCriterion, int status) {
+ afterCompletion.execute(cancelCriterion, status);
}
/**
* stop waiting for an afterCompletion to arrive and just exit
*/
- public void cleanup(TXState txState) {
- synchronized (afterCompletionSync) {
- afterCompletionCancelled = true;
- afterCompletionSync.notifyAll();
- waitForAfterCompletionToFinish(txState);
- }
+ public void cleanup(CancelCriterion cancelCriterion) {
+ afterCompletion.cancel(cancelCriterion);
}
public boolean shouldDoCleanup() {
- return isBeforeCompletionStarted() && !isAfterCompletionStarted();
+ return beforeCompletion.isStarted() && !afterCompletion.isStarted();
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index bce1af4..83d00d1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -31,6 +31,7 @@ import javax.transaction.Status;
import org.apache.logging.log4j.Logger;
+import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CommitConflictException;
@@ -869,7 +870,7 @@ public class TXState implements TXStateInterface {
protected void cleanup() {
if (singleThreadJTAExecutor.shouldDoCleanup()) {
- singleThreadJTAExecutor.cleanup(this);
+ singleThreadJTAExecutor.cleanup(getCancelCriterion());
} else {
doCleanup();
}
@@ -1036,13 +1037,17 @@ public class TXState implements TXStateInterface {
}
beforeCompletionCalled = true;
singleThreadJTAExecutor.executeBeforeCompletion(this,
- getExecutor());
+ getExecutor(), getCancelCriterion());
}
Executor getExecutor() {
return getCache().getDistributionManager().getWaitingThreadPool();
}
+ CancelCriterion getCancelCriterion() {
+ return getCache().getCancelCriterion();
+ }
+
void doBeforeCompletion() {
proxy.getTxMgr().setTXState(null);
final long opStart = CachePerfStats.getStatTime();
@@ -1105,7 +1110,7 @@ public class TXState implements TXStateInterface {
// sitting in the waiting pool to execute afterCompletion. Otherwise
// throw FailedSynchronizationException().
if (beforeCompletionCalled) {
- singleThreadJTAExecutor.executeAfterCompletion(this, status);
+ singleThreadJTAExecutor.executeAfterCompletion(getCancelCriterion(), status);
} else {
// rollback does not run beforeCompletion.
if (status != Status.STATUS_ROLLEDBACK) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
new file mode 100644
index 0000000..d94df0e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.Status;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+
+public class AfterCompletionTest {
+ private AfterCompletion afterCompletion;
+ private CancelCriterion cancelCriterion;
+ private TXState txState;
+ private Thread doOpThread;
+
+ @Before
+ public void setup() {
+ afterCompletion = new AfterCompletion();
+ cancelCriterion = mock(CancelCriterion.class);
+ txState = mock(TXState.class);
+ }
+
+ @Test
+ public void executeThrowsIfCancelCriterionThrows() {
+ doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+ assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED))
+ .isInstanceOf(RuntimeException.class);
+ }
+
+ @Test
+ public void cancelThrowsIfCancelCriterionThrows() {
+ doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+ assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion))
+ .isInstanceOf(RuntimeException.class);
+ }
+
+ @Test
+ public void isStartedReturnsFalseIfNotExecuted() {
+ assertThat(afterCompletion.isStarted()).isFalse();
+ }
+
+ @Test
+ public void isStartedReturnsTrueIfExecuted() {
+ startDoOp();
+
+ afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED);
+
+ verifyDoOpFinished();
+ assertThat(afterCompletion.isStarted()).isTrue();
+ }
+
+ @Test
+ public void executeCallsDoAfterCompletion() {
+ startDoOp();
+
+ afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED);
+ verifyDoOpFinished();
+ verify(txState, times(1)).doAfterCompletion(eq(Status.STATUS_COMMITTED));
+ }
+
+ @Test
+ public void executeThrowsDoAfterCompletionThrows() {
+ startDoOp();
+ doThrow(new RuntimeException()).when(txState).doAfterCompletion(Status.STATUS_COMMITTED);
+
+ assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED))
+ .isInstanceOf(RuntimeException.class);
+
+ verifyDoOpFinished();
+ }
+
+ @Test
+ public void cancelCallsDoCleanup() {
+ startDoOp();
+
+ afterCompletion.cancel(cancelCriterion);
+ verifyDoOpFinished();
+ verify(txState, times(1)).doCleanup();
+ }
+
+ @Test
+ public void cancelThrowsDoCleanupThrows() {
+ startDoOp();
+ doThrow(new RuntimeException()).when(txState).doCleanup();
+
+ assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion))
+ .isInstanceOf(RuntimeException.class);
+
+ verifyDoOpFinished();
+ }
+
+ private void startDoOp() {
+ doOpThread = new Thread(() -> afterCompletion.doOp(txState, cancelCriterion));
+ doOpThread.start();
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null));
+
+ }
+
+ private void verifyDoOpFinished() {
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !doOpThread.isAlive());
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
new file mode 100644
index 0000000..1f541b6
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+
+public class BeforeCompletionTest {
+
+ private BeforeCompletion beforeCompletion;
+ private CancelCriterion cancelCriterion;
+ private TXState txState;
+
+ @Before
+ public void setup() {
+ beforeCompletion = new BeforeCompletion();
+ cancelCriterion = mock(CancelCriterion.class);
+ txState = mock(TXState.class);
+ }
+
+ @Test
+ public void executeThrowsExceptionIfDoOpFailedWithException() {
+ doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion();
+
+ beforeCompletion.doOp(txState);
+
+ assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion))
+ .isInstanceOf(SynchronizationCommitConflictException.class);
+ }
+
+ @Test
+ public void doOpCallsDoBeforeCompletion() {
+ beforeCompletion.doOp(txState);
+
+ verify(txState, times(1)).doBeforeCompletion();
+ }
+
+ @Test
+ public void isStartedReturnsFalseIfNotExecuted() {
+ assertThat(beforeCompletion.isStarted()).isFalse();
+ }
+
+ @Test
+ public void isStartedReturnsTrueIfExecuted() {
+ beforeCompletion.doOp(txState);
+ beforeCompletion.execute(cancelCriterion);
+
+ assertThat(beforeCompletion.isStarted()).isTrue();
+ }
+
+ @Test
+ public void executeThrowsIfCancelCriterionThrows() {
+ doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+ assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion))
+ .isInstanceOf(RuntimeException.class);
+ }
+
+ @Test
+ public void executeWaitsUntilDoOpFinish() throws Exception {
+ Thread thread = new Thread(() -> beforeCompletion.execute(cancelCriterion));
+ thread.start();
+ // give the thread a chance to get past the "finished" check by waiting until
+ // checkCancelInProgress is called
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null));
+
+ beforeCompletion.doOp(txState);
+
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !(thread.isAlive()));
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
index 79f4324..1cf70a3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
@@ -14,10 +14,9 @@
*/
package org.apache.geode.internal.cache;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -25,85 +24,38 @@ import static org.mockito.Mockito.verify;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import javax.transaction.Status;
-
import org.junit.Before;
import org.junit.Test;
+import org.mockito.InOrder;
-import org.apache.geode.cache.SynchronizationCommitConflictException;
-import org.apache.geode.cache.TransactionException;
+import org.apache.geode.CancelCriterion;
public class SingleThreadJTAExecutorTest {
- private TXState txState;
private SingleThreadJTAExecutor singleThreadJTAExecutor;
+ private TXState txState;
private ExecutorService executor;
+ private BeforeCompletion beforeCompletion;
+ private AfterCompletion afterCompletion;
+ private CancelCriterion cancelCriterion;
@Before
public void setup() {
txState = mock(TXState.class, RETURNS_DEEP_STUBS);
executor = Executors.newSingleThreadExecutor();
+ beforeCompletion = mock(BeforeCompletion.class);
+ afterCompletion = mock(AfterCompletion.class);
+ cancelCriterion = mock(CancelCriterion.class);
+ singleThreadJTAExecutor = new SingleThreadJTAExecutor(beforeCompletion, afterCompletion);
}
@Test
- public void executeBeforeCompletionCallsDoBeforeCompletion() {
- singleThreadJTAExecutor = new SingleThreadJTAExecutor();
-
- singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
-
- verify(txState, times(1)).doBeforeCompletion();
-
- assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
- }
-
- @Test(expected = SynchronizationCommitConflictException.class)
- public void executeBeforeCompletionThrowsExceptionIfBeforeCompletionFailed() {
- singleThreadJTAExecutor = new SingleThreadJTAExecutor();
- doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion();
-
- singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
-
- verify(txState, times(1)).doBeforeCompletion();
- assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
- }
-
- @Test
- public void executeAfterCompletionCallsDoAfterCompletion() {
- singleThreadJTAExecutor = new SingleThreadJTAExecutor();
- int status = Status.STATUS_COMMITTED;
-
- singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
- singleThreadJTAExecutor.executeAfterCompletion(txState, status);
-
- verify(txState, times(1)).doBeforeCompletion();
- verify(txState, times(1)).doAfterCompletion(status);
- assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
- }
-
- @Test
- public void executeAfterCompletionThrowsExceptionIfAfterCompletionFailed() {
- singleThreadJTAExecutor = new SingleThreadJTAExecutor();
- int status = Status.STATUS_COMMITTED;
- TransactionException exception = new TransactionException("");
- doThrow(exception).when(txState).doAfterCompletion(status);
-
- singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
-
- assertThatThrownBy(() -> singleThreadJTAExecutor.executeAfterCompletion(txState, status))
- .isSameAs(exception);
- verify(txState, times(1)).doBeforeCompletion();
- verify(txState, times(1)).doAfterCompletion(status);
- }
-
- @Test
- public void executorThreadNoLongerWaitForAfterCompletionIfTXStateIsCleanedUp() {
- singleThreadJTAExecutor = new SingleThreadJTAExecutor();
-
- singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
- singleThreadJTAExecutor.cleanup(txState);
+ public void executeBeforeCompletionCallsDoOps() {
+ singleThreadJTAExecutor.executeBeforeCompletion(txState, executor, cancelCriterion);
- verify(txState, times(1)).doBeforeCompletion();
- assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
- assertThat(singleThreadJTAExecutor.isAfterCompletionFinished()).isTrue();
+ InOrder inOrder = inOrder(beforeCompletion, afterCompletion);
+ inOrder.verify(beforeCompletion, times(1)).doOp(eq(txState));
+ inOrder.verify(afterCompletion, times(1)).doOp(eq(txState), eq(cancelCriterion));
+ verify(beforeCompletion, times(1)).execute(eq(cancelCriterion));
}
}