You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/04/29 07:03:06 UTC
[ignite] branch master updated: IGNITE-6324 Filtering uncommited tx
WAL records to prevent restoring tx partially - Fixes #8987.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d0da231 IGNITE-6324 Filtering uncommited tx WAL records to prevent restoring tx partially - Fixes #8987.
d0da231 is described below
commit d0da231c7e8891d9fd45be3a245612e41b464e60
Author: zstan <st...@gmail.com>
AuthorDate: Thu Apr 29 09:56:07 2021 +0300
IGNITE-6324 Filtering uncommited tx WAL records to prevent restoring tx partially - Fixes #8987.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../internal/pagemem/wal/record/DataEntry.java | 1 +
.../internal/pagemem/wal/record/WALRecord.java | 3 +-
.../GridCacheDatabaseSharedManager.java | 33 +-
.../persistence/checkpoint/CheckpointManager.java | 4 +-
.../cache/persistence/checkpoint/Checkpointer.java | 13 +-
.../cache/transactions/IgniteTxAdapter.java | 17 +-
.../cache/transactions/IgniteTxLocalAdapter.java | 11 +-
.../cache/transactions/IgniteTxManager.java | 66 +++-
.../persistence/db/IgniteLogicalRecoveryTest.java | 2 +-
.../db/IgniteLogicalRecoveryWithParamsTest.java | 368 +++++++++++++++++++++
.../IgnitePdsWithIndexingCoreTestSuite.java | 2 +
11 files changed, 487 insertions(+), 33 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
index dd05726..73de55b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
@@ -44,6 +44,7 @@ public class DataEntry {
protected GridCacheOperation op;
/** Near transaction version. */
+ @GridToStringInclude
protected GridCacheVersion nearXidVer;
/** Write version. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index f07b71a..8f49e5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -342,7 +342,8 @@ public abstract class WALRecord {
PHYSICAL,
/**
* Logical records are needed to replay logical updates since last checkpoint.
- * {@link GridCacheDatabaseSharedManager#applyLogicalUpdates(CheckpointStatus, org.apache.ignite.lang.IgnitePredicate, org.apache.ignite.lang.IgniteBiPredicate, boolean)}
+ * {@link GridCacheDatabaseSharedManager#applyLogicalUpdates(CheckpointStatus, org.apache.ignite.lang.IgnitePredicate,
+ * org.apache.ignite.lang.IgniteBiPredicate, boolean)}
*/
LOGICAL,
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index e6ca08c..42ff08d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
@@ -137,6 +138,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
import org.apache.ignite.internal.processors.port.GridPortRecord;
@@ -183,6 +185,7 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD_V2;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.TX_RECORD;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
@@ -839,7 +842,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
metaStorage = createMetastorage(true);
- applyLogicalUpdates(status, onlyMetastorageGroup(), onlyMetastorageAndEncryptionRecords(), false);
+ applyLogicalUpdates(status, onlyMetastorageGroup(), onlyMetastorageAndEncryptionRecords(), true);
fillWalDisabledGroups();
@@ -1945,9 +1948,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
status,
groupsWithEnabledWal(),
logicalRecords(),
- true
+ false
);
+ cctx.tm().clearUncommitedStates();
+
if (recoveryVerboseLogging && log.isInfoEnabled()) {
log.info("Partition states information after LOGICAL RECOVERY phase:");
@@ -2598,6 +2603,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* @param status Last registered checkpoint status.
+ * @param restoreMeta Metastore restore phase if {@code true}.
* @throws IgniteCheckedException If failed to apply updates.
* @throws StorageException If IO exception occurred while reading write-ahead log.
*/
@@ -2605,13 +2611,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
CheckpointStatus status,
IgnitePredicate<Integer> cacheGroupsPredicate,
IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate,
- boolean skipFieldLookup
+ boolean restoreMeta
) throws IgniteCheckedException {
if (log.isInfoEnabled())
- log.info("Applying lost cache updates since last checkpoint record [lastMarked="
+ log.info("Applying lost " + (restoreMeta ? "metastore" : "cache") + " updates since last checkpoint record [lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
- if (skipFieldLookup)
+ if (!restoreMeta)
cctx.kernalContext().query().skipFieldLookup(true);
long start = U.currentTimeMillis();
@@ -2633,6 +2639,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
RestoreLogicalState restoreLogicalState =
new RestoreLogicalState(status, it, lastArchivedSegment, cacheGroupsPredicate, partitionRecoveryStates);
+ final IgniteTxManager txManager = cctx.tm();
+
try {
while (it.hasNextX()) {
WALRecord rec = restoreLogicalState.next();
@@ -2641,6 +2649,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
break;
switch (rec.type()) {
+ case TX_RECORD:
+ if (restoreMeta) { // Also restore tx states.
+ TxRecord txRec = (TxRecord)rec;
+
+ txManager.collectTxStates(txRec);
+ }
+
+ break;
case CHECKPOINT_RECORD: // Calculate initial partition states
CheckpointRecord cpRec = (CheckpointRecord)rec;
@@ -2682,6 +2698,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
+ if (!restoreMeta && txManager.uncommitedTx(dataEntry))
+ continue;
+
int cacheId = dataEntry.cacheId();
DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheId);
@@ -2777,7 +2796,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
finally {
it.close();
- if (skipFieldLookup)
+ if (!restoreMeta)
cctx.kernalContext().query().skipFieldLookup(false);
}
@@ -3465,7 +3484,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @return WAL records predicate that passes only Metastorage and encryption data records.
*/
private IgniteBiPredicate<WALRecord.RecordType, WALPointer> onlyMetastorageAndEncryptionRecords() {
- return (type, ptr) -> type == METASTORE_DATA_RECORD ||
+ return (type, ptr) -> type == METASTORE_DATA_RECORD || type == TX_RECORD ||
type == MASTER_KEY_CHANGE_RECORD || type == MASTER_KEY_CHANGE_RECORD_V2;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
index 73b05d6..3d8c287 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
@@ -327,9 +327,7 @@ public class CheckpointManager {
checkpointMarkersStorage.cleanupCheckpointDirectory();
}
- /**
- *
- */
+ /** Current checkpointer implementation. */
public Checkpointer getCheckpointer() {
return checkpointer;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index 1424a44..a7ac809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -120,7 +120,7 @@ public class Checkpointer extends GridWorker {
private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds.
/** Avoid the start checkpoint if checkpointer was canceled. */
- private final boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false);
+ private volatile boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false);
/** Long JVM pause threshold. */
private final int longJvmPauseThreshold =
@@ -244,7 +244,7 @@ public class Checkpointer extends GridWorker {
checkpointWritePageThreads,
checkpointWritePageThreads,
30_000,
- new LinkedBlockingQueue<Runnable>()
+ new LinkedBlockingQueue<>()
);
return null;
@@ -1007,4 +1007,13 @@ public class Checkpointer extends GridWorker {
private boolean isShutdownNow() {
return shutdownNow;
}
+
+ /**
+ * Skip checkpoint on node stop.
+ *
+ * @param skip If {@code true} skips checkpoint on node stop.
+ */
+ public void skipCheckpointOnNodeStop(boolean skip) {
+ skipCheckpointOnNodeStop = skip;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 9daddb5..04db6cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -63,7 +63,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
@@ -1228,7 +1227,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
seal();
if (state == PREPARED || state == COMMITTED || state == ROLLED_BACK) {
- cctx.tm().setMvccState(this, toMvccState(state));
+ cctx.tm().setMvccState(this, state);
ptr = cctx.tm().logTxRecord(this);
}
@@ -1260,20 +1259,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** */
- private byte toMvccState(TransactionState state) {
- switch (state) {
- case PREPARED:
- return TxState.PREPARED;
- case COMMITTED:
- return TxState.COMMITTED;
- case ROLLED_BACK:
- return TxState.ABORTED;
- default:
- throw new IllegalStateException("Unexpected state: " + state);
- }
- }
-
- /** */
private void recordStateChangedEvent(TransactionState state) {
if (!near() || !local()) // Covers only GridNearTxLocal's state changes.
return;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7b6cd7f..50d07d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpda
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -937,7 +938,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
U.warn(log, "Failed to commit transaction, node is stopping [tx=" + CU.txString(this) +
", err=" + ex + ']');
- return;
+ boolean persistenceEnabled = CU.isPersistenceEnabled(cctx.kernalContext().config());
+
+ if (persistenceEnabled) {
+ GridCacheDatabaseSharedManager dbManager = (GridCacheDatabaseSharedManager)cctx.database();
+
+ dbManager.getCheckpointer().skipCheckpointOnNodeStop(true);
+ }
+
+ throw ex;
}
err = heuristicException(ex);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c3c07b5..c153caa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
@@ -53,6 +55,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
import org.apache.ignite.internal.managers.systemview.walker.TransactionViewWalker;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -88,6 +91,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOpti
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -198,6 +202,23 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private static final int SLOW_TX_WARN_TIMEOUT =
Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, DFLT_SLOW_TX_WARN_TIMEOUT);
+ /** Returns {@code true} if transaction has completed states. */
+ public static final Predicate<TxRecord> COMPLETED_TX_STATES = new Predicate<TxRecord>() {
+ @Override public boolean test(TxRecord txRec) {
+ return txRec.state() == COMMITTED || txRec.state() == ROLLED_BACK;
+ }
+ };
+
+ /** Returns {@code true} if transaction has prepared states. */
+ public static final Predicate<TxRecord> PREPARED_TX_STATES = new Predicate<TxRecord>() {
+ @Override public boolean test(TxRecord txRec) {
+ return txRec.state() == PREPARED || txRec.state() == PREPARING;
+ }
+ };
+
+ /** Uncommited tx states. */
+ private Set<GridCacheVersion> uncommitedTx = new HashSet<>();
+
/** One phase commit deferred ack request timeout. */
public static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT,
@@ -2842,20 +2863,36 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param tx Transaction.
* @param state New state.
*/
- public void setMvccState(IgniteInternalTx tx, byte state) {
+ public void setMvccState(IgniteInternalTx tx, TransactionState state) {
if (cctx.kernalContext().clientNode() || tx.mvccSnapshot() == null || tx.near() && !tx.local())
return;
+ byte state0 = toMvccState(state);
+
cctx.database().checkpointReadLock();
try {
- cctx.coordinators().updateState(tx.mvccSnapshot(), state, tx.local());
+ cctx.coordinators().updateState(tx.mvccSnapshot(), state0, tx.local());
}
finally {
cctx.database().checkpointReadUnlock();
}
}
+ /** */
+ private byte toMvccState(TransactionState state) {
+ switch (state) {
+ case PREPARED:
+ return TxState.PREPARED;
+ case COMMITTED:
+ return TxState.COMMITTED;
+ case ROLLED_BACK:
+ return TxState.ABORTED;
+ default:
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+ }
+
/**
* Finishes MVCC transaction.
* @param tx Transaction.
@@ -3764,4 +3801,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return 0;
}
}
+
+ /**
+ * Collects tx states {@link TransactionState} for further correct restoring.
+ *
+ * @param txRec tx Record.
+ */
+ public void collectTxStates(final TxRecord txRec) {
+ if (COMPLETED_TX_STATES.test(txRec))
+ uncommitedTx.remove(txRec.nearXidVersion());
+ else if (PREPARED_TX_STATES.test(txRec))
+ uncommitedTx.add(txRec.nearXidVersion());
+ }
+
+ /**
+ * @param dataEntry Processing entry.
+ * @return {@code true} If entry contains not completed tx version.
+ */
+ public boolean uncommitedTx(final DataEntry dataEntry) {
+ return uncommitedTx.contains(dataEntry.nearXidVersion());
+ }
+
+ /** Clears tx states collections. */
+ public void clearUncommitedStates() {
+ uncommitedTx = Collections.emptySet();
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
index 8cc7c3f..3f8f853 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
@@ -480,7 +480,7 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
.collect(Collectors.toList());
Assert.assertTrue("There was unexpected rebalance for some groups" +
- " [node=" + node.name() + ", groups=" + rebalancedGroups + ']', rebalancedGroups.isEmpty());
+ " [node=" + node.name() + ", groups=" + rebalancedGroups + ']', rebalancedGroups.isEmpty());
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryWithParamsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryWithParamsTest.java
new file mode 100644
index 0000000..aac00dc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryWithParamsTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP;
+import static org.apache.ignite.testframework.GridTestUtils.DFLT_BUSYWAIT_SLEEP_INTERVAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/**
+ * A set of tests that check correctness of logical recovery performed during node start.
+ */
+@RunWith(Parameterized.class)
+public class IgniteLogicalRecoveryWithParamsTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setFailureHandler(new StopNodeFailureHandler());
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+ .setWalMode(WALMode.LOG_ONLY)
+ .setCheckpointFrequency(1024 * 1024 * 1024) // Disable automatic checkpoints.
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setName("dflt")
+ .setInitialSize(256 * 1024 * 1024)
+ .setMaxSize(256 * 1024 * 1024)
+ .setPersistenceEnabled(true)
+ );
+
+ cfg.setDataStorageConfiguration(dsCfg);
+
+ TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi();
+
+ cfg.setCommunicationSpi(spi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** Parametrized run param : server nodes. */
+ @Parameterized.Parameter(0)
+ public Integer numSrvNodes;
+
+ /** Parametrized run param : single node tx. */
+ @Parameterized.Parameter(1)
+ public Boolean singleNodeTx;
+
+ /** Parametrized run param : backups count. */
+ @Parameterized.Parameter(2)
+ public Integer backups;
+
+ /** Test run configurations: Cache mode, atomicity type, is near. */
+ @Parameterized.Parameters(name = "nodesCnt={0}, singleNodeTx={1}, backups={2}")
+ public static Collection<Object[]> runConfig() {
+ return Arrays.asList(new Object[][] {
+ {1, true, 0},
+ {1, true, 1},
+ {1, false, 0},
+ {1, false, 1},
+ {2, true, 0},
+ {2, true, 1},
+ {2, false, 0},
+ {2, false, 1},
+ });
+ }
+
+ /**Tests partially commited transactions with further recovery. */
+ @Test
+ @WithSystemProperty(key = IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, value = "true")
+ @WithSystemProperty(key = IGNITE_WAL_LOG_TX_RECORDS, value = "true")
+ public void testPartiallyCommitedTx_TwoNode_WithCpOnNodeStop_MultiNodeTx_OneBackup() throws Exception {
+ testPartiallyCommitedTx();
+ }
+
+ /**Tests partially commited transactions with further recovery. */
+ @Test
+ @WithSystemProperty(key = IGNITE_WAL_LOG_TX_RECORDS, value = "true")
+ public void testPartiallyCommitedTx_TwoNode_WithoutCpOnNodeStop_SingleNodeTx() throws Exception {
+ testPartiallyCommitedTx();
+ }
+
+ /**
+ * Tests concurrent tx with node stop and further recovery.
+ *
+ */
+ private void testPartiallyCommitedTx() throws Exception {
+ final String cacheName = "recovery";
+
+ int itmsCount = 30_000;
+
+ AtomicBoolean failFileIO = new AtomicBoolean();
+
+ List<Integer> keys;
+
+ CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<Integer, Integer>(cacheName)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setBackups(backups)
+ .setAffinity(new RendezvousAffinityFunction(false, 32));
+
+ try {
+ final IgniteEx srv = (IgniteEx)startGridsMultiThreaded(numSrvNodes);
+
+ G.allGrids().forEach(n -> setWalIOFactory(n, failFileIO));
+
+ IgniteEx clnt = startClientGrid("client");
+
+ TestRecordingCommunicationSpi nearComm = TestRecordingCommunicationSpi.spi(clnt);
+
+ srv.cluster().state(ClusterState.ACTIVE);
+
+ final IgniteCache cache = clnt.getOrCreateCache(cfg);
+
+ final CountDownLatch commitStart = new CountDownLatch(1);
+
+ forceCheckpoint();
+
+ nearComm.blockMessages((node, msg) -> msg instanceof GridNearTxPrepareRequest);
+
+ if (singleNodeTx)
+ keys = primaryKeys(srv.cache(cacheName), itmsCount, 0);
+ else
+ keys = IntStream.range(0, itmsCount).boxed().collect(Collectors.toList());
+
+ Thread t = new Thread(() -> {
+ try (Transaction tx = clnt.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ keys.forEach(k -> cache.put(k, k));
+
+ commitStart.countDown();
+
+ tx.commit();
+ }
+ });
+
+ t.start();
+
+ commitStart.await();
+
+ nearComm.waitForBlocked();
+
+ nearComm.stopBlock();
+
+ assertTrue(waitForWalUpdates(G.allGrids().stream().filter(g -> !g.configuration().isClientMode())
+ .collect(Collectors.toList())));
+ }
+ finally {
+ failFileIO.set(true);
+
+ stopAllGrids(true);
+
+ assertTrue(G.allGrids().isEmpty());
+ }
+
+ final IgniteEx srv = (IgniteEx)startGridsMultiThreaded(numSrvNodes);
+
+ srv.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Integer, Integer> cache = srv.cache(cacheName);
+
+ int cSize = cache.size();
+
+ boolean pr = cache.get(keys.get(0)) == null;
+
+ for (int i : keys) {
+ Object res = cache.get(i);
+
+ if (pr != (res == null))
+ assertEquals("ethalon=" + pr + ", current=" + res + ", key=" + i, pr, res == null);
+ }
+
+ assert (cSize == itmsCount || cSize == 0) : "unexpected cache size: " + cSize;
+ }
+
+ /** */
+ private boolean waitForWalUpdates(Collection<Ignite> grids) throws IgniteInterruptedCheckedException {
+ long start = U.currentTimeMillis();
+
+ int[] offsets = new int[grids.size()];
+
+ int gCnt = 0;
+
+ for (Ignite grid : grids)
+ offsets[gCnt++] = getWalPos(grid);
+
+ while (true) {
+ gCnt = 0;
+
+ for (Ignite grid : grids) {
+ if (getWalPos(grid) - offsets[gCnt++] > 100)
+ return true;
+ }
+
+ U.sleep(DFLT_BUSYWAIT_SLEEP_INTERVAL / 2);
+
+ if (U.currentTimeMillis() - start > 20_000)
+ return false;
+ }
+ }
+
+ /**
+ * Sets file IO factory.
+ *
+ * @param grid Ignite instance.
+ * @param canFail If {@code true} throws exception on write.
+ *
+ */
+ private void setWalIOFactory(Ignite grid, AtomicBoolean canFail) {
+ IgniteEx grid0 = (IgniteEx)grid;
+
+ FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)grid0.context().cache().context().wal();
+
+ walMgr.setFileIOFactory(new FailingFileIOFactory(canFail));
+ }
+
+ /**
+ * @param grid Ignite instance.
+ * @return Returns current wal position.
+ */
+ private int getWalPos(Ignite grid) {
+ IgniteEx grid0 = (IgniteEx)grid;
+
+ FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)grid0.context().cache().context().wal();
+
+ FileWriteHandle fhAfter = U.field(walMgr, "currHnd");
+
+ try {
+ fhAfter.fsync(null);
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, e);
+ }
+
+ return fhAfter.position().fileOffset();
+ }
+
+ /**
+ * Create File I/O which fails after flag is touched.
+ */
+ private static class FailingFileIOFactory implements FileIOFactory {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Delegate factory. */
+ private AtomicBoolean fail;
+
+ /** */
+ private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
+
+ /** */
+ FailingFileIOFactory(AtomicBoolean fail) {
+ this.fail = fail;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ final FileIO delegate = delegateFactory.create(file, modes);
+
+ return new FileIODecorator(delegate) {
+ @Override public int write(ByteBuffer srcBuf) throws IOException {
+ if (fail != null && fail.get())
+ throw new IOException("No space left on device");
+
+ return super.write(srcBuf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int write(ByteBuffer srcBuf, long position) throws IOException {
+ if (fail != null && fail.get())
+ throw new IOException("No space left on device");
+
+ return delegate.write(srcBuf, position);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int write(byte[] buf, int off, int len) throws IOException {
+ if (fail != null && fail.get())
+ throw new IOException("No space left on device");
+
+ return delegate.write(buf, off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+ return delegate.map(sizeBytes);
+ }
+ };
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index 4f6b671..26e1df4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentS
import org.apache.ignite.internal.processors.cache.persistence.PersistenceDirectoryWarningLoggingTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteCacheGroupsWithRestartsTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteLogicalRecoveryTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgniteLogicalRecoveryWithParamsTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMultiNodePutGetRestartTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteSequentialNodeCrashRecoveryTest;
@@ -93,6 +94,7 @@ import org.junit.runners.Suite;
IgnitePdsCorruptedIndexTest.class,
IgniteLogicalRecoveryTest.class,
+ IgniteLogicalRecoveryWithParamsTest.class,
IgniteSequentialNodeCrashRecoveryTest.class,