You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2018/04/05 11:17:23 UTC
ignite git commit: IGNITE-7481: Fix rollback on timeout for suspended
transactions - Fixes #3514.
Repository: ignite
Updated Branches:
refs/heads/master 5dde444ce -> 5d6c8018d
IGNITE-7481: Fix rollback on timeout for suspended transactions - Fixes #3514.
Signed-off-by: Nikolay Izhikov <ni...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d6c8018
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d6c8018
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d6c8018
Branch: refs/heads/master
Commit: 5d6c8018dbd146d3f1611999803c66cc56e4476e
Parents: 5dde444
Author: voipp <al...@gmail.com>
Authored: Thu Apr 5 14:04:02 2018 +0300
Committer: Nikolay Izhikov <ni...@apache.org>
Committed: Thu Apr 5 14:12:25 2018 +0300
----------------------------------------------------------------------
.../cache/distributed/near/GridNearTxLocal.java | 23 +++++-
.../transactions/IgniteTxLocalAdapter.java | 12 ++-
...cheTransactionalAbstractMetricsSelfTest.java | 79 +++++++++++++++++++-
3 files changed, 111 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d6c8018/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a3fddaf..7785605 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -34,6 +34,7 @@ import javax.cache.CacheException;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
@@ -113,6 +114,7 @@ import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
/**
@@ -2885,6 +2887,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
* @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out.
*/
public void resume() throws IgniteCheckedException {
+ resume(true);
+ }
+
+ /**
+ * Resumes transaction (possibly in another thread) if it was previously suspended.
+ *
+ * @param checkTimeout Whether timeout should be checked.
+ * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out.
+ */
+ private void resume(boolean checkTimeout) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Resume near local tx: " + this);
@@ -2892,7 +2904,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
throw new UnsupportedOperationException("Resume is not supported for pessimistic transactions.");
synchronized (this) {
- checkValid();
+ checkValid(checkTimeout);
cctx.tm().resumeTx(this);
}
@@ -4121,6 +4133,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
/** {@inheritDoc} */
@Override public void onTimeout() {
+ if (state() == SUSPENDED) {
+ try {
+ resume(false);
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Error resuming suspended transaction on timeout: " + this, e);
+ }
+ }
+
if (state(MARKED_ROLLBACK, true) || (state() == MARKED_ROLLBACK)) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d6c8018/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index dac4e09..c2a2842 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1292,7 +1292,17 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
* @throws IgniteCheckedException If transaction check failed.
*/
protected void checkValid() throws IgniteCheckedException {
- if (local() && !dht() && remainingTime() == -1)
+ checkValid(true);
+ }
+
+ /**
+ * Checks transaction expiration.
+ *
+ * @param checkTimeout Whether timeout should be checked.
+ * @throws IgniteCheckedException If transaction check failed.
+ */
+ protected void checkValid(boolean checkTimeout) throws IgniteCheckedException {
+ if (local() && !dht() && remainingTime() == -1 && checkTimeout)
state(MARKED_ROLLBACK, true);
if (isRollbackOnly()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d6c8018/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTransactionalAbstractMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTransactionalAbstractMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTransactionalAbstractMetricsSelfTest.java
index 0ab5bc4..c445879 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTransactionalAbstractMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTransactionalAbstractMetricsSelfTest.java
@@ -18,8 +18,10 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -30,6 +32,7 @@ import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
/**
* Transactional cache metrics test.
@@ -38,6 +41,9 @@ public abstract class GridCacheTransactionalAbstractMetricsSelfTest extends Grid
/** */
private static final int TX_CNT = 3;
+ /** Transaction timeout. */
+ private static final long TX_TIMEOUT = 500L;
+
/**
* @throws Exception If failed.
*/
@@ -207,6 +213,27 @@ public abstract class GridCacheTransactionalAbstractMetricsSelfTest extends Grid
}
/**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticSuspendedReadCommittedTxTimeoutRollbacks() throws Exception {
+ doTestSuspendedTxTimeoutRollbacks(OPTIMISTIC, READ_COMMITTED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticSuspendedRepeatableReadTxTimeoutRollbacks() throws Exception {
+ doTestSuspendedTxTimeoutRollbacks(OPTIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticSuspendedSerializableTxTimeoutRollbacks() throws Exception {
+ doTestSuspendedTxTimeoutRollbacks(OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /**
* @param concurrency Concurrency control.
* @param isolation Isolation level.
* @param put Put some data if {@code true}.
@@ -299,4 +326,54 @@ public abstract class GridCacheTransactionalAbstractMetricsSelfTest extends Grid
}
}
}
-}
\ No newline at end of file
+
+ /**
+ * Metrics test for transaction timeout rollback.
+ *
+ * @param concurrency Concurrency control.
+ * @param isolation Isolation level.
+ * @throws Exception If failed.
+ */
+ private void doTestSuspendedTxTimeoutRollbacks(TransactionConcurrency concurrency, TransactionIsolation isolation)
+ throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+ IgniteTransactions transactions = grid(0).transactions();
+
+ for (int i = 0; i < TX_CNT; i++) {
+ Transaction tx = transactions.txStart(concurrency, isolation, TX_TIMEOUT, 0);
+
+ cache.put(1, 1);
+
+ tx.suspend();
+
+ boolean res = GridTestUtils.waitForCondition(() -> tx.state() == ROLLED_BACK, TX_TIMEOUT * 10);
+
+ assertTrue(res);
+
+ tx.close();
+ }
+
+ TransactionMetrics txMetrics = transactions.metrics();
+ CacheMetrics cacheMetrics = cache.localMetrics();
+
+ assertEquals(0, txMetrics.txCommits());
+ assertEquals(0, cacheMetrics.getCacheTxCommits());
+
+ assertEquals(TX_CNT, txMetrics.txRollbacks());
+ assertEquals(TX_CNT, cacheMetrics.getCacheTxRollbacks());
+
+ assertTrue(cacheMetrics.getAverageTxRollbackTime() > 0);
+
+ for (int i = 1; i < gridCount(); i++) {
+ txMetrics = grid(i).transactions().metrics();
+ cacheMetrics = grid(i).cache(DEFAULT_CACHE_NAME).localMetrics();
+
+ assertEquals(0, txMetrics.txCommits());
+ assertEquals(0, cacheMetrics.getCacheTxCommits());
+
+ assertEquals(0, txMetrics.txRollbacks());
+ assertEquals(0, cacheMetrics.getCacheTxRollbacks());
+ }
+ }
+}