You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/09/10 19:52:26 UTC

[geode] branch feature/GEODE-5697 updated: refactor afterCompletion code.

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5697
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-5697 by this push:
     new d89d29d  refactor afterCompletion code.
d89d29d is described below

commit d89d29ddfcf52b3cb09003d84cc1347488987ef3
Author: eshu <es...@pivotal.io>
AuthorDate: Mon Sep 10 12:51:56 2018 -0700

    refactor afterCompletion code.
---
 .../geode/internal/cache/AfterCompletion.java      | 41 ++++++++++----
 .../internal/cache/SingleThreadJTAExecutor.java    |  8 ++-
 .../org/apache/geode/internal/cache/TXState.java   | 63 ++++++++++++----------
 .../geode/internal/cache/AfterCompletionTest.java  | 46 ++++++++++++----
 .../apache/geode/internal/cache/TXStateTest.java   | 40 ++++++++++++--
 5 files changed, 144 insertions(+), 54 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
index a63bece..d0495f1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
@@ -26,10 +26,14 @@ public class AfterCompletion {
 
   private boolean started;
   private boolean finished;
-  private int status = -1;
-  private boolean cancelled;
   private RuntimeException exception;
 
+  private enum Action {
+    COMMIT, ROLLBACK, CANCEL
+  };
+
+  private Action action;
+
   public synchronized void doOp(TXState txState, CancelCriterion cancelCriterion) {
     // there should be a transaction timeout that keeps this thread
     // from sitting around forever if the client goes away
@@ -39,15 +43,21 @@ public class AfterCompletion {
     try {
       waitForExecuteOrCancel(cancelCriterion);
     } catch (RuntimeException | Error ignore) {
-      cancelled = true;
+      action = Action.CANCEL;
     }
     started = true;
     logger.debug("executing afterCompletion notification");
     try {
-      if (cancelled) {
-        txState.doCleanup();
-      } else {
-        txState.doAfterCompletion(status);
+      switch (action) {
+        case CANCEL:
+          txState.doCleanup();
+          break;
+        case COMMIT:
+          txState.doAfterCompletionCommit();
+          break;
+        case ROLLBACK:
+          txState.doAfterCompletionRollback();
+          break;
       }
     } catch (RuntimeException exception) {
       this.exception = exception;
@@ -59,7 +69,7 @@ public class AfterCompletion {
   }
 
   private void waitForExecuteOrCancel(CancelCriterion cancelCriterion) {
-    waitForCondition(cancelCriterion, () -> status != -1 || cancelled);
+    waitForCondition(cancelCriterion, () -> action != null);
   }
 
   private synchronized void waitForCondition(CancelCriterion cancelCriterion,
@@ -77,12 +87,21 @@ public class AfterCompletion {
     }
   }
 
-  public synchronized void execute(int status) {
-    this.status = status;
+  public void executeCommit() {
+    executeAction(Action.COMMIT);
+  }
+
+  public void executeRollback() {
+    executeAction(Action.ROLLBACK);
+  }
+
+  private synchronized void executeAction(Action action) {
+    this.action = action;
     signalAndWaitForDoOp();
     if (exception != null) {
       throw exception;
     }
+
   }
 
   private void signalAndWaitForDoOp() {
@@ -95,7 +114,7 @@ public class AfterCompletion {
   }
 
   public synchronized void cancel() {
-    cancelled = true;
+    action = Action.CANCEL;
     signalAndWaitForDoOp();
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
index 218fa1d..6413748 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
@@ -57,8 +57,12 @@ public class SingleThreadJTAExecutor {
     beforeCompletion.execute(cancelCriterion);
   }
 
-  public void executeAfterCompletion(int status) {
-    afterCompletion.execute(status);
+  public void executeAfterCompletionCommit() {
+    afterCompletion.executeCommit();
+  }
+
+  public void executeAfterCompletionRollback() {
+    afterCompletion.executeRollback();
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index e799b25..4e2857b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -1111,45 +1111,54 @@ public class TXState implements TXStateInterface {
     // if there was a beforeCompletion call then there will be a thread
     // sitting in the waiting pool to execute afterCompletion. Otherwise
     // throw FailedSynchronizationException().
-    if (beforeCompletionCalled) {
-      singleThreadJTAExecutor.executeAfterCompletion(status);
+    if (wasBeforeCompletionCalled()) {
+      switch (status) {
+        case Status.STATUS_COMMITTED:
+          singleThreadJTAExecutor.executeAfterCompletionCommit();
+          break;
+        case Status.STATUS_ROLLEDBACK:
+          singleThreadJTAExecutor.executeAfterCompletionRollback();
+          break;
+        default:
+          throw new TransactionException("Unknown JTA Synchronization status " + status);
+      }
     } else {
       // rollback does not run beforeCompletion.
       if (status != Status.STATUS_ROLLEDBACK) {
         throw new FailedSynchronizationException(
             "Could not execute afterCompletion when beforeCompletion was not executed");
       }
-      doAfterCompletion(status);
+      doAfterCompletionRollback();
     }
   }
 
-  void doAfterCompletion(int status) {
+  void doAfterCompletionCommit() {
     final long opStart = CachePerfStats.getStatTime();
     try {
-      switch (status) {
-        case Status.STATUS_COMMITTED:
-          Assert.assertTrue(this.locks != null,
-              "Gemfire Transaction afterCompletion called with illegal state.");
-          try {
-            commit();
-            saveTXCommitMessageForClientFailover();
-          } catch (CommitConflictException error) {
-            Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
-                + " afterCompletion failed.due to CommitConflictException: " + error);
-          }
-
-          this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
-          this.locks = null;
-          break;
-        case Status.STATUS_ROLLEDBACK:
-          this.jtaLifeTime = opStart - getBeginTime();
-          rollback();
-          saveTXCommitMessageForClientFailover();
-          this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
-          break;
-        default:
-          Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
+      Assert.assertTrue(this.locks != null,
+          "Gemfire Transaction afterCompletion called with illegal state.");
+      try {
+        commit();
+        saveTXCommitMessageForClientFailover();
+      } catch (CommitConflictException error) {
+        Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
+            + " afterCompletion failed.due to CommitConflictException: " + error);
       }
+      this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
+      this.locks = null;
+
+    } catch (InternalGemFireError error) {
+      throw new TransactionException(error);
+    }
+  }
+
+  void doAfterCompletionRollback() {
+    final long opStart = CachePerfStats.getStatTime();
+    this.jtaLifeTime = opStart - getBeginTime();
+    try {
+      rollback();
+      saveTXCommitMessageForClientFailover();
+      this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
     } catch (InternalGemFireError error) {
       throw new TransactionException(error);
     }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
index d12cdb3..bb0d1ad 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
@@ -16,7 +16,6 @@ package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -24,8 +23,6 @@ import static org.mockito.Mockito.verify;
 
 import java.util.concurrent.TimeUnit;
 
-import javax.transaction.Status;
-
 import org.awaitility.Awaitility;
 import org.junit.Before;
 import org.junit.Test;
@@ -54,33 +51,62 @@ public class AfterCompletionTest {
   public void isStartedReturnsTrueIfExecuted() {
     startDoOp();
 
-    afterCompletion.execute(Status.STATUS_COMMITTED);
+    afterCompletion.executeCommit();
 
     verifyDoOpFinished();
     assertThat(afterCompletion.isStarted()).isTrue();
   }
 
   @Test
-  public void executeCallsDoAfterCompletion() {
+  public void executeCallsDoAfterCompletionCommit() {
+    startDoOp();
+
+    afterCompletion.executeCommit();
+    verifyDoOpFinished();
+    verify(txState, times(1)).doAfterCompletionCommit();
+  }
+
+  @Test
+  public void executeThrowsDoAfterCompletionCommitThrows() {
+    startDoOp();
+    doThrow(new RuntimeException()).when(txState).doAfterCompletionCommit();
+
+    assertThatThrownBy(() -> afterCompletion.executeCommit())
+        .isInstanceOf(RuntimeException.class);
+
+    verifyDoOpFinished();
+  }
+
+  @Test
+  public void executeCallsDoAfterCompletionRollback() {
     startDoOp();
 
-    afterCompletion.execute(Status.STATUS_COMMITTED);
+    afterCompletion.executeRollback();
     verifyDoOpFinished();
-    verify(txState, times(1)).doAfterCompletion(eq(Status.STATUS_COMMITTED));
+    verify(txState, times(1)).doAfterCompletionRollback();
   }
 
   @Test
-  public void executeThrowsDoAfterCompletionThrows() {
+  public void executeThrowsDoAfterCompletionRollbackThrows() {
     startDoOp();
-    doThrow(new RuntimeException()).when(txState).doAfterCompletion(Status.STATUS_COMMITTED);
+    doThrow(new RuntimeException()).when(txState).doAfterCompletionRollback();
 
-    assertThatThrownBy(() -> afterCompletion.execute(Status.STATUS_COMMITTED))
+    assertThatThrownBy(() -> afterCompletion.executeRollback())
         .isInstanceOf(RuntimeException.class);
 
     verifyDoOpFinished();
   }
 
   @Test
+  public void doOpInvokesDoCleanupIfCancelCriteriaThrows() {
+    doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+    afterCompletion.doOp(txState, cancelCriterion);
+
+    verify(txState).doCleanup();
+  }
+
+  @Test
   public void cancelCallsDoCleanup() {
     startDoOp();
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 21fadf0..d65c361 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -36,17 +37,20 @@ import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.FailedSynchronizationException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionException;
 
 public class TXStateTest {
   private TXStateProxyImpl txStateProxy;
   private CommitConflictException exception;
   private TransactionDataNodeHasDepartedException transactionDataNodeHasDepartedException;
+  private SingleThreadJTAExecutor executor;
 
   @Before
   public void setup() {
     txStateProxy = mock(TXStateProxyImpl.class, RETURNS_DEEP_STUBS);
     exception = new CommitConflictException("");
     transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException("");
+    executor = mock(SingleThreadJTAExecutor.class);
 
     when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
   }
@@ -63,11 +67,10 @@ public class TXStateTest {
   @Test
   public void doAfterCompletionThrowsIfCommitFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
-    doReturn(true).when(txState).wasBeforeCompletionCalled();
     txState.reserveAndCheck();
     doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
 
-    assertThatThrownBy(() -> txState.doAfterCompletion(Status.STATUS_COMMITTED))
+    assertThatThrownBy(() -> txState.doAfterCompletionCommit())
         .isSameAs(transactionDataNodeHasDepartedException);
   }
 
@@ -76,8 +79,7 @@ public class TXStateTest {
     TXState txState = spy(new TXState(txStateProxy, false));
     txState.reserveAndCheck();
     txState.closed = true;
-    doReturn(true).when(txState).wasBeforeCompletionCalled();
-    txState.doAfterCompletion(Status.STATUS_COMMITTED);
+    txState.doAfterCompletionCommit();
 
     assertThat(txState.locks).isNull();
     verify(txState, times(1)).saveTXCommitMessageForClientFailover();
@@ -90,6 +92,36 @@ public class TXStateTest {
   }
 
   @Test
+  public void afterCompletionInvokesExecuteAfterCompletionCommitIfBeforeCompletionCalled() {
+    TXState txState = spy(new TXState(txStateProxy, true, executor));
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+
+    txState.afterCompletion(Status.STATUS_COMMITTED);
+
+    verify(executor, times(1)).executeAfterCompletionCommit();
+  }
+
+  @Test
+  public void afterCompletionThrowsWithUnexpectedStatusIfBeforeCompletionCalled() {
+    TXState txState = spy(new TXState(txStateProxy, true, executor));
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+
+    Throwable thrown = catchThrowable(() -> txState.afterCompletion(Status.STATUS_NO_TRANSACTION));
+
+    assertThat(thrown).isInstanceOf(TransactionException.class);
+  }
+
+  @Test
+  public void afterCompletionInvokesExecuteAfterCompletionRollbackIfBeforeCompletionCalled() {
+    TXState txState = spy(new TXState(txStateProxy, true, executor));
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+
+    txState.afterCompletion(Status.STATUS_ROLLEDBACK);
+
+    verify(executor, times(1)).executeAfterCompletionRollback();
+  }
+
+  @Test
   public void afterCompletionCanRollbackJTA() {
     TXState txState = spy(new TXState(txStateProxy, true));
     txState.afterCompletion(Status.STATUS_ROLLEDBACK);