You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2018/04/05 11:17:23 UTC

ignite git commit: IGNITE-7481: Fix rollback on timeout for suspended transactions - Fixes #3514.

Repository: ignite
Updated Branches:
  refs/heads/master 5dde444ce -> 5d6c8018d


IGNITE-7481: Fix rollback on timeout for suspended transactions - Fixes #3514.

Signed-off-by: Nikolay Izhikov <ni...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d6c8018
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d6c8018
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d6c8018

Branch: refs/heads/master
Commit: 5d6c8018dbd146d3f1611999803c66cc56e4476e
Parents: 5dde444
Author: voipp <al...@gmail.com>
Authored: Thu Apr 5 14:04:02 2018 +0300
Committer: Nikolay Izhikov <ni...@apache.org>
Committed: Thu Apr 5 14:12:25 2018 +0300

----------------------------------------------------------------------
 .../cache/distributed/near/GridNearTxLocal.java | 23 +++++-
 .../transactions/IgniteTxLocalAdapter.java      | 12 ++-
 ...cheTransactionalAbstractMetricsSelfTest.java | 79 +++++++++++++++++++-
 3 files changed, 111 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5d6c8018/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a3fddaf..7785605 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -34,6 +34,7 @@ import javax.cache.CacheException;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
@@ -113,6 +114,7 @@ import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
 import static org.apache.ignite.transactions.TransactionState.PREPARING;
 import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
 import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
 import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
 
 /**
@@ -2885,6 +2887,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
      * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out.
      */
     public void resume() throws IgniteCheckedException {
+        resume(true);
+    }
+
+    /**
+     * Resumes transaction (possibly in another thread) if it was previously suspended.
+     *
+     * @param checkTimeout Whether timeout should be checked.
+     * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out.
+     */
+    private void resume(boolean checkTimeout) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Resume near local tx: " + this);
 
@@ -2892,7 +2904,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
             throw new UnsupportedOperationException("Resume is not supported for pessimistic transactions.");
 
         synchronized (this) {
-            checkValid();
+            checkValid(checkTimeout);
 
             cctx.tm().resumeTx(this);
         }
@@ -4121,6 +4133,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
+        if (state() == SUSPENDED) {
+            try {
+                resume(false);
+            }
+            catch (IgniteCheckedException e) {
+                log.warning("Error resuming suspended transaction on timeout: " + this, e);
+            }
+        }
+
         if (state(MARKED_ROLLBACK, true) || (state() == MARKED_ROLLBACK)) {
             cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                 @Override public void run() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d6c8018/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index dac4e09..c2a2842 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1292,7 +1292,17 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      * @throws IgniteCheckedException If transaction check failed.
      */
     protected void checkValid() throws IgniteCheckedException {
-        if (local() && !dht() && remainingTime() == -1)
+        checkValid(true);
+    }
+
+    /**
+     * Checks transaction expiration.
+     *
+     * @param checkTimeout Whether timeout should be checked.
+     * @throws IgniteCheckedException If transaction check failed.
+     */
+    protected void checkValid(boolean checkTimeout) throws IgniteCheckedException {
+        if (local() && !dht() && remainingTime() == -1 && checkTimeout)
             state(MARKED_ROLLBACK, true);
 
         if (isRollbackOnly()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d6c8018/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTransactionalAbstractMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTransactionalAbstractMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTransactionalAbstractMetricsSelfTest.java
index 0ab5bc4..c445879 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTransactionalAbstractMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTransactionalAbstractMetricsSelfTest.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -30,6 +32,7 @@ import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
 
 /**
  * Transactional cache metrics test.
@@ -38,6 +41,9 @@ public abstract class GridCacheTransactionalAbstractMetricsSelfTest extends Grid
     /** */
     private static final int TX_CNT = 3;
 
+    /** Transaction timeout. */
+    private static final long TX_TIMEOUT = 500L;
+
     /**
      * @throws Exception If failed.
      */
@@ -207,6 +213,27 @@ public abstract class GridCacheTransactionalAbstractMetricsSelfTest extends Grid
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticSuspendedReadCommittedTxTimeoutRollbacks() throws Exception {
+        doTestSuspendedTxTimeoutRollbacks(OPTIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticSuspendedRepeatableReadTxTimeoutRollbacks() throws Exception {
+        doTestSuspendedTxTimeoutRollbacks(OPTIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticSuspendedSerializableTxTimeoutRollbacks() throws Exception {
+        doTestSuspendedTxTimeoutRollbacks(OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
      * @param concurrency Concurrency control.
      * @param isolation Isolation level.
      * @param put Put some data if {@code true}.
@@ -299,4 +326,54 @@ public abstract class GridCacheTransactionalAbstractMetricsSelfTest extends Grid
             }
         }
     }
-}
\ No newline at end of file
+
+    /**
+     * Metrics test for transaction timeout rollback.
+     *
+     * @param concurrency Concurrency control.
+     * @param isolation Isolation level.
+     * @throws Exception If failed.
+     */
+    private void doTestSuspendedTxTimeoutRollbacks(TransactionConcurrency concurrency, TransactionIsolation isolation)
+        throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+        IgniteTransactions transactions = grid(0).transactions();
+
+        for (int i = 0; i < TX_CNT; i++) {
+            Transaction tx = transactions.txStart(concurrency, isolation, TX_TIMEOUT, 0);
+
+            cache.put(1, 1);
+
+            tx.suspend();
+
+            boolean res = GridTestUtils.waitForCondition(() -> tx.state() == ROLLED_BACK, TX_TIMEOUT * 10);
+
+            assertTrue(res);
+
+            tx.close();
+        }
+
+        TransactionMetrics txMetrics = transactions.metrics();
+        CacheMetrics cacheMetrics = cache.localMetrics();
+
+        assertEquals(0, txMetrics.txCommits());
+        assertEquals(0, cacheMetrics.getCacheTxCommits());
+
+        assertEquals(TX_CNT, txMetrics.txRollbacks());
+        assertEquals(TX_CNT, cacheMetrics.getCacheTxRollbacks());
+
+        assertTrue(cacheMetrics.getAverageTxRollbackTime() > 0);
+
+        for (int i = 1; i < gridCount(); i++) {
+            txMetrics = grid(i).transactions().metrics();
+            cacheMetrics = grid(i).cache(DEFAULT_CACHE_NAME).localMetrics();
+
+            assertEquals(0, txMetrics.txCommits());
+            assertEquals(0, cacheMetrics.getCacheTxCommits());
+
+            assertEquals(0, txMetrics.txRollbacks());
+            assertEquals(0, cacheMetrics.getCacheTxRollbacks());
+        }
+    }
+}