You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/01/20 11:44:13 UTC
[ignite] branch master updated: IGNITE-12548 Fixed tx desync during
recovery on near node left. - Fixes #7274.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b677b36c IGNITE-12548 Fixed tx desync during recovery on near node left. - Fixes #7274.
b677b36c is described below
commit b677b36cc484c683803e18b98ccb138dec2eedaf
Author: Aleksei Scherbakov <al...@gmail.com>
AuthorDate: Mon Jan 20 14:43:52 2020 +0300
IGNITE-12548 Fixed tx desync during recovery on near node left. - Fixes #7274.
Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
.../managers/communication/GridIoManager.java | 10 +-
.../distributed/GridCacheTxRecoveryFuture.java | 15 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 38 ++-
.../cache/transactions/IgniteInternalTx.java | 5 +
.../cache/transactions/IgniteTxAdapter.java | 7 +-
.../cache/transactions/IgniteTxHandler.java | 34 ++-
.../cache/transactions/IgniteTxLocalAdapter.java | 7 -
.../cache/transactions/IgniteTxManager.java | 20 +-
.../ignite/internal/util/StripedExecutor.java | 10 +
.../TxRecoveryWithConcurrentRollbackTest.java | 340 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite7.java | 3 +
11 files changed, 443 insertions(+), 46 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9a763e1..9821edf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1398,14 +1398,16 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
- if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != GridIoMessage.STRIPE_DISABLED_PART) {
- ctx.getStripedExecutorService().execute(msg.partition(), c);
+ final int part = msg.partition(); // Store partition to avoid possible recalculation.
+
+ if (plc == GridIoPolicy.SYSTEM_POOL && part != GridIoMessage.STRIPE_DISABLED_PART) {
+ ctx.getStripedExecutorService().execute(part, c);
return;
}
- if (plc == GridIoPolicy.DATA_STREAMER_POOL && msg.partition() != GridIoMessage.STRIPE_DISABLED_PART) {
- ctx.getDataStreamerExecutorService().execute(msg.partition(), c);
+ if (plc == GridIoPolicy.DATA_STREAMER_POOL && part != GridIoMessage.STRIPE_DISABLED_PART) {
+ ctx.getDataStreamerExecutorService().execute(part, c);
return;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 1abd525..3fc053d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -114,24 +114,15 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B
UUID locNodeId = cctx.localNodeId();
- for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
- if (!locNodeId.equals(e.getKey()) && !failedNodeIds.contains(e.getKey()) && !nodes.containsKey(e.getKey())) {
- ClusterNode node = cctx.discovery().node(e.getKey());
-
- if (node != null)
- nodes.put(node.id(), node);
- else if (log.isInfoEnabled())
- log.info("Transaction node left (will ignore) " + e.getKey());
- }
-
- for (UUID nodeId : e.getValue()) {
+ for (Map.Entry<UUID, Collection<UUID>> entry : tx.transactionNodes().entrySet()) {
+ for (UUID nodeId : F.concat(false, entry.getKey(), entry.getValue())) {
if (!locNodeId.equals(nodeId) && !failedNodeIds.contains(nodeId) && !nodes.containsKey(nodeId)) {
ClusterNode node = cctx.discovery().node(nodeId);
if (node != null)
nodes.put(node.id(), node);
else if (log.isInfoEnabled())
- log.info("Transaction node left (will ignore) " + e.getKey());
+ log.info("Transaction node left (will ignore) " + nodeId);
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 00d1588..4152993 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteDiagnosticAware;
@@ -37,7 +38,12 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxSerializationCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -253,6 +259,19 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
if (this.tx.syncMode() != PRIMARY_SYNC)
this.tx.sendFinishReply(finishErr);
+ if (!this.tx.txState().mvccEnabled() && !commit && shouldApplyCountersOnRollbackError(finishErr)) {
+ TxCounters txCounters = this.tx.txCounters(false);
+
+ if (txCounters != null) {
+ try {
+ cctx.tm().txHandler().applyPartitionsUpdatesCounters(txCounters.updateCounters(), true, true);
+ }
+ catch (IgniteCheckedException e0) {
+ throw new IgniteException(e0);
+ }
+ }
+ }
+
// Don't forget to clean up.
cctx.mvcc().removeFuture(futId);
@@ -264,6 +283,19 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
}
/**
+ * @param e Exception to check.
+ *
+ * @return {@code True} if counters must be applied.
+ */
+ private boolean shouldApplyCountersOnRollbackError(Throwable e) {
+ return e == null ||
+ e instanceof IgniteTxRollbackCheckedException ||
+ e instanceof IgniteTxTimeoutCheckedException ||
+ e instanceof IgniteTxOptimisticCheckedException ||
+ e instanceof IgniteTxSerializationCheckedException;
+ }
+
+ /**
* @param f Future.
* @return {@code True} if mini-future.
*/
@@ -450,7 +482,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- sync ? FULL_SYNC : tx.syncMode(),
+ sync || !commit ? FULL_SYNC : tx.syncMode(),
tx.completedBase(),
tx.committedVersions(),
tx.rolledbackVersions(),
@@ -483,8 +515,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
", node=" + n.id() + ']');
}
- if (sync)
- res = true;
+ if (sync || !commit)
+ res = true; // Force sync mode for rollback to prevent an issue with concurrent recovery.
else
fut.onDone();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 7502541..4a09ade 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -290,6 +290,11 @@ public interface IgniteInternalTx {
public boolean markFinalizing(FinalizationStatus status);
/**
+ * @return Finalization status.
+ */
+ public @Nullable FinalizationStatus finalizationStatus();
+
+ /**
* @param cacheId Cache id.
* @param part Invalid partition.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 4440174..fff1214 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -631,7 +631,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
/**
* @return Finalization status.
*/
- protected FinalizationStatus finalizationStatus() {
+ @Override @Nullable public FinalizationStatus finalizationStatus() {
return finalizing;
}
@@ -2321,6 +2321,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
+ @Nullable @Override public FinalizationStatus finalizationStatus() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public void addInvalidPartition(int cacheId, int part) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 5eee8ed..57c210b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1066,14 +1066,14 @@ public class IgniteTxHandler {
tx.nearFinishMiniId(req.miniId());
tx.storeEnabled(req.storeEnabled());
- if (req.commit()) {
- if (!tx.markFinalizing(USER_FINISH)) {
- if (log.isDebugEnabled())
- log.debug("Will not finish transaction (it is handled by another thread): " + tx);
+ if (!tx.markFinalizing(USER_FINISH)) {
+ if (log.isDebugEnabled())
+ log.debug("Will not finish transaction (it is handled by another thread) [commit=" + req.commit() + ", tx=" + tx + ']');
- return null;
- }
+ return null;
+ }
+ if (req.commit()) {
IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitDhtLocalAsync();
// Only for error logging.
@@ -2273,6 +2273,10 @@ public class IgniteTxHandler {
}
/**
+ * Applies partition counter updates for transactions.
+ * <p>
+ * Called after entries are written to WAL on commit or during rollback to close gaps in update counter sequence.
+ *
* @param counters Counters.
*/
public void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters)
@@ -2282,13 +2286,23 @@ public class IgniteTxHandler {
/**
* Applies partition counter updates for transactions.
+ * <p>
+ * Called after entries are written to WAL on commit or during rollback to close gaps in update counter sequence.
+ * <p>
+ * On rollback counters should be applied on the primary only after backup nodes, otherwise if the primary fail
+ * before sending rollback requests to backups remote transactions can be committed by recovery protocol and
+ * partition consistency will not be restored when primary returns to the grid because RollbackRecord was written
+ * (actual for persistent mode only).
*
* @param counters Counter values to be updated.
- * @param rollback {@code True} if applied from rollbacks.
+ * @param rollback {@code True} if applied during rollbacks.
+ * @param rollbackOnPrimary {@code True} if rollback happens on primary node. Passed to CQ engine.
*/
- public void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters,
+ public void applyPartitionsUpdatesCounters(
+ Iterable<PartitionUpdateCountersMessage> counters,
boolean rollback,
- boolean rollbackOnPrimary) throws IgniteCheckedException {
+ boolean rollbackOnPrimary
+ ) throws IgniteCheckedException {
if (counters == null)
return;
@@ -2349,7 +2363,7 @@ public class IgniteTxHandler {
invalid = true;
}
- if (invalid && log.isDebugEnabled()) {
+ if (log.isDebugEnabled() && invalid) {
log.debug("Received partition update counters message for invalid partition, ignoring: " +
"[cacheId=" + counter.cacheId() + ", part=" + counter.partition(i) + ']');
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index a146ad5..18ccdea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1106,13 +1106,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
- if (!txState.mvccEnabled()) {
- TxCounters txCounters = txCounters(false);
-
- if (txCounters != null)
- cctx.tm().txHandler().applyPartitionsUpdatesCounters(txCounters.updateCounters(), true, true);
- }
-
cctx.tm().rollbackTx(this, clearThreadMap, skipCompletedVersions());
cctx.mvccCaching().onTxFinished(this, false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a6fd06c..96381a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -21,7 +21,6 @@ import java.io.Externalizable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@@ -2187,6 +2186,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
TransactionState state = tx.state();
if (state == PREPARED || state == COMMITTING || state == COMMITTED) {
+ if (state == PREPARED)
+ tx.markFinalizing(RECOVERY_FINISH); // Prevents concurrent rollback.
+
if (--txNum == 0) {
if (fut != null)
fut.onDone(true);
@@ -2195,7 +2197,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
}
else {
- if (tx.state(MARKED_ROLLBACK) || tx.state() == UNKNOWN) {
+ if (tx.setRollbackOnly() || tx.state() == UNKNOWN) {
tx.rollbackAsync();
if (log.isDebugEnabled())
@@ -2232,7 +2234,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
if (processedVers == null)
- processedVers = new HashSet<>(txNum, 1.0f);
+ processedVers = U.newHashSet(txNum);
processedVers.add(tx.xidVersion());
}
@@ -2281,12 +2283,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (log.isInfoEnabled())
log.info("Finishing prepared transaction [commit=" + commit + ", tx=" + tx + ']');
- if (!tx.markFinalizing(RECOVERY_FINISH)) {
- if (log.isInfoEnabled())
- log.info("Will not try to commit prepared transaction (could not mark finalized): " + tx);
-
- return;
- }
+ // Transactions participating in recovery can be finished only by recovery consensus.
+ assert tx.finalizationStatus() == RECOVERY_FINISH : tx;
if (tx instanceof IgniteTxRemoteEx) {
IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
@@ -2333,6 +2331,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert !F.isEmpty(tx.transactionNodes()) : tx;
assert tx.nearXidVersion() != null : tx;
+ // Transaction will be completed by finish message.
+ if (!tx.markFinalizing(RECOVERY_FINISH))
+ return;
+
GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture(
cctx,
tx,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index bf5220a..8ed4d7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -296,6 +296,16 @@ public class StripedExecutor implements ExecutorService {
}
/**
+ * @param idx Stripe index.
+ * @return Queue size of specific stripe.
+ */
+ public int queueStripeSize(int idx) {
+ A.ensure(idx >= 0, "Stripe index should be non-negative: " + idx);
+
+ return stripes[idx % stripes.length].queueSize();
+ }
+
+ /**
* @return Completed tasks count.
*/
public long completedTasks() {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
new file mode 100644
index 0000000..9886bad
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionState;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/**
+ */
+public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest {
+ /** Backups. */
+ private int backups;
+
+ /** Persistence. */
+ private boolean persistence;
+
+ /** Sync mode. */
+ private CacheWriteSynchronizationMode syncMode;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(name);
+
+ cfg.setConsistentId(name);
+
+ if (persistence) {
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration().
+ setWalSegmentSize(4 * 1024 * 1024).
+ setWalHistorySize(1000).
+ setCheckpointFrequency(Integer.MAX_VALUE).
+ setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(true).setMaxSize(50 * 1024 * 1024)));
+ }
+
+ cfg.setActiveOnStart(false);
+ cfg.setClientMode("client".equals(name));
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
+ setCacheMode(PARTITIONED).
+ setBackups(backups).
+ setAtomicityMode(TRANSACTIONAL).
+ setWriteSynchronizationMode(syncMode));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * The test enforces specific order in messages processing during concurrent tx rollback and tx recovery due to
+ * node left.
+ * <p>
+ * Expected result: both DHT transactions produces same COMMITTED state on tx finish.
+ * */
+ @Test
+ public void testRecoveryNotBreakingTxAtomicityOnNearFail() throws Exception {
+ backups = 1;
+ persistence = false;
+
+ final IgniteEx node0 = startGrids(3);
+ node0.cluster().active(true);
+
+ final Ignite client = startGrid("client");
+
+ final IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ final List<Integer> g0Keys = primaryKeys(grid(0).cache(DEFAULT_CACHE_NAME), 100);
+ final List<Integer> g1Keys = primaryKeys(grid(1).cache(DEFAULT_CACHE_NAME), 100);
+
+ final List<Integer> g2BackupKeys = backupKeys(grid(2).cache(DEFAULT_CACHE_NAME), 100, 0);
+
+ Integer k1 = null;
+ Integer k2 = null;
+
+ for (Integer key : g2BackupKeys) {
+ if (g0Keys.contains(key))
+ k1 = key;
+ else if (g1Keys.contains(key))
+ k2 = key;
+
+ if (k1 != null && k2 != null)
+ break;
+ }
+
+ assertNotNull(k1);
+ assertNotNull(k2);
+
+ List<IgniteInternalTx> txs0 = null;
+ List<IgniteInternalTx> txs1 = null;
+
+ CountDownLatch stripeBlockLatch = new CountDownLatch(1);
+
+ int[] stripeHolder = new int[1];
+
+ try(final Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ cache.put(k1, Boolean.TRUE);
+ cache.put(k2, Boolean.TRUE);
+
+ TransactionProxyImpl p = (TransactionProxyImpl)tx;
+ p.tx().prepare(true);
+
+ txs0 = txs(grid(0));
+ txs1 = txs(grid(1));
+ List<IgniteInternalTx> txs2 = txs(grid(2));
+
+ assertTrue(txs0.size() == 1);
+ assertTrue(txs1.size() == 1);
+ assertTrue(txs2.size() == 2);
+
+ // Prevent recovery request for grid1 tx branch to go to grid0.
+ TestRecordingCommunicationSpi.spi(grid(1)).blockMessages(GridCacheTxRecoveryRequest.class, grid(0).name());
+ // Prevent finish(false) request processing on node0.
+ TestRecordingCommunicationSpi.spi(client).blockMessages(GridNearTxFinishRequest.class, grid(0).name());
+
+ int stripe = U.safeAbs(p.tx().xidVersion().hashCode());
+
+ stripeHolder[0] = stripe;
+
+ // Blocks stripe processing for rollback request on node1.
+ grid(1).context().getStripedExecutorService().execute(stripe, () -> U.awaitQuiet(stripeBlockLatch));
+ // Dummy task to ensure msg is processed.
+ grid(1).context().getStripedExecutorService().execute(stripe, () -> {});
+
+ runAsync(() -> {
+ TestRecordingCommunicationSpi.spi(client).waitForBlocked();
+
+ client.close();
+
+ return null;
+ });
+
+ tx.rollback();
+
+ fail();
+ }
+ catch (Exception ignored) {
+ // Expected.
+ }
+
+ // Wait until tx0 is committed by recovery on node0.
+ assertNotNull(txs0);
+ try {
+ txs0.get(0).finishFuture().get(3_000);
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ // If timeout happens recovery message from g0 to g1 is mapped to the same stripe as near finish request.
+ // We will complete latch to allow sequential processing.
+ stripeBlockLatch.countDown();
+
+ // Wait until sequential processing is finished.
+ assertTrue("sequential processing", GridTestUtils.waitForCondition(() ->
+ grid(1).context().getStripedExecutorService().queueStripeSize(stripeHolder[0]) == 0, 5_000));
+
+ // Unblock recovery message from g1 to g0 because tx is in RECOVERY_FINISH state and waits for recovery end.
+ TestRecordingCommunicationSpi.spi(grid(1)).stopBlock();
+
+ txs0.get(0).finishFuture().get();
+ txs1.get(0).finishFuture().get();
+
+ final TransactionState s1 = txs0.get(0).state();
+ final TransactionState s2 = txs1.get(0).state();
+
+ assertEquals(s1, s2);
+
+ return;
+ }
+
+ // Release rollback request processing, triggering an attempt to rollback the transaction during recovery.
+ stripeBlockLatch.countDown();
+
+ // Wait until finish message is processed.
+ assertTrue("concurrent processing", GridTestUtils.waitForCondition(() ->
+ grid(1).context().getStripedExecutorService().queueStripeSize(stripeHolder[0]) == 0, 5_000));
+
+ // Proceed with recovery on grid1 -> grid0. Tx0 is committed so tx1 also should be committed.
+ TestRecordingCommunicationSpi.spi(grid(1)).stopBlock();
+
+ assertNotNull(txs1);
+ txs1.get(0).finishFuture().get();
+
+ final TransactionState s1 = txs0.get(0).state();
+ final TransactionState s2 = txs1.get(0).state();
+
+ assertEquals(s1, s2);
+ }
+
+ /** */
+ @Test
+ public void testRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail_FULL_SYNC() throws Exception {
+ doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(FULL_SYNC);
+ }
+
+ /** */
+ @Test
+ public void testRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail_PRIMARY_SYNC() throws Exception {
+ doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(PRIMARY_SYNC);
+ }
+
+ /** */
+ @Test
+ public void testRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail_FULL_ASYNC() throws Exception {
+ doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(FULL_ASYNC);
+ }
+
+ /**
+ * Stop near and primary node after primary tx is rolled back with enabled persistence.
+ * <p>
+ * Expected result: after restarting a primary node all partitions are consistent.
+ */
+ private void doTestRecoveryNotBreakingTxAtomicityOnNearAndPrimaryFail(CacheWriteSynchronizationMode syncMode)
+ throws Exception {
+ backups = 2;
+ persistence = true;
+ this.syncMode = syncMode;
+
+ final IgniteEx node0 = startGrids(3);
+ node0.cluster().active(true);
+
+ final Ignite client = startGrid("client");
+
+ final IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ final Integer pk = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
+
+ IgniteInternalFuture<Void> fut = null;
+
+ List<IgniteInternalTx> tx0 = null;
+ List<IgniteInternalTx> tx2 = null;
+
+ try(final Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ cache.put(pk, Boolean.TRUE);
+
+ TransactionProxyImpl p = (TransactionProxyImpl)tx;
+ p.tx().prepare(true);
+
+ tx0 = txs(grid(0));
+ tx2 = txs(grid(2));
+
+ TestRecordingCommunicationSpi.spi(grid(1)).blockMessages((node, msg) -> msg instanceof GridDhtTxFinishRequest);
+
+ fut = runAsync(() -> {
+ TestRecordingCommunicationSpi.spi(grid(1)).waitForBlocked(2);
+
+ client.close();
+ grid(1).close();
+
+ return null;
+ });
+
+ tx.rollback();
+ }
+ catch (Exception e) {
+ // No-op.
+ }
+
+ fut.get();
+
+ final IgniteInternalTx tx_0 = tx0.get(0);
+ tx_0.finishFuture().get();
+
+ final IgniteInternalTx tx_2 = tx2.get(0);
+ tx_2.finishFuture().get();
+
+ assertPartitionsSame(idleVerify(grid(0), DEFAULT_CACHE_NAME));
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ assertPartitionsSame(idleVerify(grid(0), DEFAULT_CACHE_NAME));
+ }
+
+ /**
+ * @param g Grid.
+ */
+ private List<IgniteInternalTx> txs(IgniteEx g) {
+ return new ArrayList<>(g.context().cache().context().tm().activeTransactions());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index e135c64..12d9162 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.CheckpointBuff
import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithPrimaryIndexCorruptionTest;
import org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheMapOnInvalidTopologyTest;
import org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheRemoteMultiplePartitionReservationTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxRecoveryWithConcurrentRollbackTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncWithPersistenceTest;
import org.apache.ignite.internal.processors.cache.transactions.TxWithSmallTimeoutAndContentionOneKeyTest;
import org.apache.ignite.testframework.GridTestUtils;
@@ -123,6 +124,8 @@ public class IgniteCacheTestSuite7 {
GridTestUtils.addTestIfNeeded(suite, SafeLogTxFinishErrorTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, TxRecoveryWithConcurrentRollbackTest.class, ignoredTests);
+
return suite;
}
}