You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/09/10 19:52:26 UTC
[geode] branch feature/GEODE-5697 updated: refactor afterCompletion
code.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-5697
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-5697 by this push:
new d89d29d refactor afterCompletion code.
d89d29d is described below
commit d89d29ddfcf52b3cb09003d84cc1347488987ef3
Author: eshu <es...@pivotal.io>
AuthorDate: Mon Sep 10 12:51:56 2018 -0700
refactor afterCompletion code.
---
.../geode/internal/cache/AfterCompletion.java | 41 ++++++++++----
.../internal/cache/SingleThreadJTAExecutor.java | 8 ++-
.../org/apache/geode/internal/cache/TXState.java | 63 ++++++++++++----------
.../geode/internal/cache/AfterCompletionTest.java | 46 ++++++++++++----
.../apache/geode/internal/cache/TXStateTest.java | 40 ++++++++++++--
5 files changed, 144 insertions(+), 54 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
index a63bece..d0495f1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
@@ -26,10 +26,14 @@ public class AfterCompletion {
private boolean started;
private boolean finished;
- private int status = -1;
- private boolean cancelled;
private RuntimeException exception;
+ private enum Action {
+ COMMIT, ROLLBACK, CANCEL
+ };
+
+ private Action action;
+
public synchronized void doOp(TXState txState, CancelCriterion cancelCriterion) {
// there should be a transaction timeout that keeps this thread
// from sitting around forever if the client goes away
@@ -39,15 +43,21 @@ public class AfterCompletion {
try {
waitForExecuteOrCancel(cancelCriterion);
} catch (RuntimeException | Error ignore) {
- cancelled = true;
+ action = Action.CANCEL;
}
started = true;
logger.debug("executing afterCompletion notification");
try {
- if (cancelled) {
- txState.doCleanup();
- } else {
- txState.doAfterCompletion(status);
+ switch (action) {
+ case CANCEL:
+ txState.doCleanup();
+ break;
+ case COMMIT:
+ txState.doAfterCompletionCommit();
+ break;
+ case ROLLBACK:
+ txState.doAfterCompletionRollback();
+ break;
}
} catch (RuntimeException exception) {
this.exception = exception;
@@ -59,7 +69,7 @@ public class AfterCompletion {
}
private void waitForExecuteOrCancel(CancelCriterion cancelCriterion) {
- waitForCondition(cancelCriterion, () -> status != -1 || cancelled);
+ waitForCondition(cancelCriterion, () -> action != null);
}
private synchronized void waitForCondition(CancelCriterion cancelCriterion,
@@ -77,12 +87,21 @@ public class AfterCompletion {
}
}
- public synchronized void execute(int status) {
- this.status = status;
+ public void executeCommit() {
+ executeAction(Action.COMMIT);
+ }
+
+ public void executeRollback() {
+ executeAction(Action.ROLLBACK);
+ }
+
+ private synchronized void executeAction(Action action) {
+ this.action = action;
signalAndWaitForDoOp();
if (exception != null) {
throw exception;
}
+
}
private void signalAndWaitForDoOp() {
@@ -95,7 +114,7 @@ public class AfterCompletion {
}
public synchronized void cancel() {
- cancelled = true;
+ action = Action.CANCEL;
signalAndWaitForDoOp();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
index 218fa1d..6413748 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
@@ -57,8 +57,12 @@ public class SingleThreadJTAExecutor {
beforeCompletion.execute(cancelCriterion);
}
- public void executeAfterCompletion(int status) {
- afterCompletion.execute(status);
+ public void executeAfterCompletionCommit() {
+ afterCompletion.executeCommit();
+ }
+
+ public void executeAfterCompletionRollback() {
+ afterCompletion.executeRollback();
}
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index e799b25..4e2857b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -1111,45 +1111,54 @@ public class TXState implements TXStateInterface {
// if there was a beforeCompletion call then there will be a thread
// sitting in the waiting pool to execute afterCompletion. Otherwise
// throw FailedSynchronizationException().
- if (beforeCompletionCalled) {
- singleThreadJTAExecutor.executeAfterCompletion(status);
+ if (wasBeforeCompletionCalled()) {
+ switch (status) {
+ case Status.STATUS_COMMITTED:
+ singleThreadJTAExecutor.executeAfterCompletionCommit();
+ break;
+ case Status.STATUS_ROLLEDBACK:
+ singleThreadJTAExecutor.executeAfterCompletionRollback();
+ break;
+ default:
+ throw new TransactionException("Unknown JTA Synchronization status " + status);
+ }
} else {
// rollback does not run beforeCompletion.
if (status != Status.STATUS_ROLLEDBACK) {
throw new FailedSynchronizationException(
"Could not execute afterCompletion when beforeCompletion was not executed");
}
- doAfterCompletion(status);
+ doAfterCompletionRollback();
}
}
- void doAfterCompletion(int status) {
+ void doAfterCompletionCommit() {
final long opStart = CachePerfStats.getStatTime();
try {
- switch (status) {
- case Status.STATUS_COMMITTED:
- Assert.assertTrue(this.locks != null,
- "Gemfire Transaction afterCompletion called with illegal state.");
- try {
- commit();
- saveTXCommitMessageForClientFailover();
- } catch (CommitConflictException error) {
- Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
- + " afterCompletion failed.due to CommitConflictException: " + error);
- }
-
- this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
- this.locks = null;
- break;
- case Status.STATUS_ROLLEDBACK:
- this.jtaLifeTime = opStart - getBeginTime();
- rollback();
- saveTXCommitMessageForClientFailover();
- this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
- break;
- default:
- Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
+ Assert.assertTrue(this.locks != null,
+ "Gemfire Transaction afterCompletion called with illegal state.");
+ try {
+ commit();
+ saveTXCommitMessageForClientFailover();
+ } catch (CommitConflictException error) {
+ Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
+ + " afterCompletion failed.due to CommitConflictException: " + error);
}
+ this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
+ this.locks = null;
+
+ } catch (InternalGemFireError error) {
+ throw new TransactionException(error);
+ }
+ }
+
+ void doAfterCompletionRollback() {
+ final long opStart = CachePerfStats.getStatTime();
+ this.jtaLifeTime = opStart - getBeginTime();
+ try {
+ rollback();
+ saveTXCommitMessageForClientFailover();
+ this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
} catch (InternalGemFireError error) {
throw new TransactionException(error);
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
index d12cdb3..bb0d1ad 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
@@ -16,7 +16,6 @@ package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -24,8 +23,6 @@ import static org.mockito.Mockito.verify;
import java.util.concurrent.TimeUnit;
-import javax.transaction.Status;
-
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Test;
@@ -54,33 +51,62 @@ public class AfterCompletionTest {
public void isStartedReturnsTrueIfExecuted() {
startDoOp();
- afterCompletion.execute(Status.STATUS_COMMITTED);
+ afterCompletion.executeCommit();
verifyDoOpFinished();
assertThat(afterCompletion.isStarted()).isTrue();
}
@Test
- public void executeCallsDoAfterCompletion() {
+ public void executeCallsDoAfterCompletionCommit() {
+ startDoOp();
+
+ afterCompletion.executeCommit();
+ verifyDoOpFinished();
+ verify(txState, times(1)).doAfterCompletionCommit();
+ }
+
+ @Test
+ public void executeThrowsDoAfterCompletionCommitThrows() {
+ startDoOp();
+ doThrow(new RuntimeException()).when(txState).doAfterCompletionCommit();
+
+ assertThatThrownBy(() -> afterCompletion.executeCommit())
+ .isInstanceOf(RuntimeException.class);
+
+ verifyDoOpFinished();
+ }
+
+ @Test
+ public void executeCallsDoAfterCompletionRollback() {
startDoOp();
- afterCompletion.execute(Status.STATUS_COMMITTED);
+ afterCompletion.executeRollback();
verifyDoOpFinished();
- verify(txState, times(1)).doAfterCompletion(eq(Status.STATUS_COMMITTED));
+ verify(txState, times(1)).doAfterCompletionRollback();
}
@Test
- public void executeThrowsDoAfterCompletionThrows() {
+ public void executeThrowsDoAfterCompletionRollbackThrows() {
startDoOp();
- doThrow(new RuntimeException()).when(txState).doAfterCompletion(Status.STATUS_COMMITTED);
+ doThrow(new RuntimeException()).when(txState).doAfterCompletionRollback();
- assertThatThrownBy(() -> afterCompletion.execute(Status.STATUS_COMMITTED))
+ assertThatThrownBy(() -> afterCompletion.executeRollback())
.isInstanceOf(RuntimeException.class);
verifyDoOpFinished();
}
@Test
+ public void doOpInvokesDoCleanupIfCancelCriteriaThrows() {
+ doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+ afterCompletion.doOp(txState, cancelCriterion);
+
+ verify(txState).doCleanup();
+ }
+
+ @Test
public void cancelCallsDoCleanup() {
startDoOp();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 21fadf0..d65c361 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@@ -36,17 +37,20 @@ import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.FailedSynchronizationException;
import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionException;
public class TXStateTest {
private TXStateProxyImpl txStateProxy;
private CommitConflictException exception;
private TransactionDataNodeHasDepartedException transactionDataNodeHasDepartedException;
+ private SingleThreadJTAExecutor executor;
@Before
public void setup() {
txStateProxy = mock(TXStateProxyImpl.class, RETURNS_DEEP_STUBS);
exception = new CommitConflictException("");
transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException("");
+ executor = mock(SingleThreadJTAExecutor.class);
when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
}
@@ -63,11 +67,10 @@ public class TXStateTest {
@Test
public void doAfterCompletionThrowsIfCommitFails() {
TXState txState = spy(new TXState(txStateProxy, true));
- doReturn(true).when(txState).wasBeforeCompletionCalled();
txState.reserveAndCheck();
doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
- assertThatThrownBy(() -> txState.doAfterCompletion(Status.STATUS_COMMITTED))
+ assertThatThrownBy(() -> txState.doAfterCompletionCommit())
.isSameAs(transactionDataNodeHasDepartedException);
}
@@ -76,8 +79,7 @@ public class TXStateTest {
TXState txState = spy(new TXState(txStateProxy, false));
txState.reserveAndCheck();
txState.closed = true;
- doReturn(true).when(txState).wasBeforeCompletionCalled();
- txState.doAfterCompletion(Status.STATUS_COMMITTED);
+ txState.doAfterCompletionCommit();
assertThat(txState.locks).isNull();
verify(txState, times(1)).saveTXCommitMessageForClientFailover();
@@ -90,6 +92,36 @@ public class TXStateTest {
}
@Test
+ public void afterCompletionInvokesExecuteAfterCompletionCommitIfBeforeCompletionCalled() {
+ TXState txState = spy(new TXState(txStateProxy, true, executor));
+ doReturn(true).when(txState).wasBeforeCompletionCalled();
+
+ txState.afterCompletion(Status.STATUS_COMMITTED);
+
+ verify(executor, times(1)).executeAfterCompletionCommit();
+ }
+
+ @Test
+ public void afterCompletionThrowsWithUnexpectedStatusIfBeforeCompletionCalled() {
+ TXState txState = spy(new TXState(txStateProxy, true, executor));
+ doReturn(true).when(txState).wasBeforeCompletionCalled();
+
+ Throwable thrown = catchThrowable(() -> txState.afterCompletion(Status.STATUS_NO_TRANSACTION));
+
+ assertThat(thrown).isInstanceOf(TransactionException.class);
+ }
+
+ @Test
+ public void afterCompletionInvokesExecuteAfterCompletionRollbackIfBeforeCompletionCalled() {
+ TXState txState = spy(new TXState(txStateProxy, true, executor));
+ doReturn(true).when(txState).wasBeforeCompletionCalled();
+
+ txState.afterCompletion(Status.STATUS_ROLLEDBACK);
+
+ verify(executor, times(1)).executeAfterCompletionRollback();
+ }
+
+ @Test
public void afterCompletionCanRollbackJTA() {
TXState txState = spy(new TXState(txStateProxy, true));
txState.afterCompletion(Status.STATUS_ROLLEDBACK);