You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/03/13 13:47:58 UTC
[ignite] branch master updated: IGNITE-14134 Add snapshot partition
iterator for hash check (#8767)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1dd263c IGNITE-14134 Add snapshot partition iterator for hash check (#8767)
1dd263c is described below
commit 1dd263c40aedb60e5bad4b4eb924b938665905c5
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Sat Mar 13 16:47:28 2021 +0300
IGNITE-14134 Add snapshot partition iterator for hash check (#8767)
---
.../apache/ignite/util/GridCommandHandlerTest.java | 19 +-
.../cache/persistence/CacheDataRowAdapter.java | 77 ++++-
.../persistence/file/FilePageStoreManager.java | 3 +-
.../snapshot/IgniteSnapshotManager.java | 314 ++++++++++++++++++++-
.../snapshot/SnapshotPartitionsVerifyTask.java | 30 +-
.../persistence/tree/io/AbstractDataPageIO.java | 2 +-
.../internal/processors/cache/tree/DataRow.java | 7 +
.../cache/verify/IdleVerifyResultV2.java | 21 ++
.../processors/cache/verify/IdleVerifyUtility.java | 54 ++++
.../cache/verify/PartitionHashRecordV2.java | 10 +-
.../cache/verify/VerifyBackupPartitionsTaskV2.java | 123 +++-----
.../ignite/plugin/security/SecurityPermission.java | 2 +-
.../db/IgnitePdsDataRegionMetricsTest.java | 19 +-
.../snapshot/AbstractSnapshotSelfTest.java | 21 +-
.../snapshot/IgniteClusterSnapshotCheckTest.java | 230 ++++++++++++++-
.../snapshot/IgniteSnapshotManagerSelfTest.java | 180 ++++++++++--
16 files changed, 940 insertions(+), 172 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index f9e3ccb..173b850 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -1996,28 +1996,21 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
node.cluster().active(true);
- IgniteCache cache = node.createCache(new CacheConfiguration<>()
+ IgniteCache<Integer, Integer> cache = node.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
.setAffinity(new RendezvousAffinityFunction(false, 32))
.setBackups(1)
- .setName(DEFAULT_CACHE_NAME)
- );
+ .setName(DEFAULT_CACHE_NAME));
AtomicBoolean stopFlag = new AtomicBoolean();
- Thread loadThread = new Thread(() -> {
+ IgniteInternalFuture<?> loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
- while (!stopFlag.get()) {
+ while (!stopFlag.get() && !Thread.currentThread().isInterrupted())
cache.put(rnd.nextInt(1000), rnd.nextInt(1000));
-
- if (Thread.interrupted())
- break;
- }
- });
+ }, 5, "load-thread-");
try {
- loadThread.start();
-
doSleep(checkpointFreq);
injectTestSystemOut();
@@ -2029,7 +2022,7 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
stopFlag.set(true);
- loadThread.join();
+ loadFut.get();
}
String out = testOut.toString();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index ff15bd0..fb289b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -37,8 +37,11 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTreeRun
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -52,6 +55,8 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CR
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.KEY_ONLY;
import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.LINK_WITH_HEADER;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.T_DATA;
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
/**
* Cache data row adapter.
@@ -161,6 +166,56 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
/**
+ * @param sctx Cache shared context.
+ * @param coctx Cache object context for data deserialization.
+ * @param reader Reader to read fragmented pages.
+ * @param pageBuff Initial page buffer with headers.
+ * @param itemId Item id to read.
+ * @param readCacheId {@code true} if cache id must be read.
+ * @param rowData Which data from page must be read.
+ * @param skipVer {@code true} if cache version must be skipped.
+ * @throws IgniteCheckedException If fails.
+ */
+ public final void initFromPageBuffer(
+ GridCacheSharedContext<?, ?> sctx,
+ CacheObjectContext coctx,
+ IgniteThrowableFunction<Long, ByteBuffer> reader,
+ ByteBuffer pageBuff,
+ int itemId,
+ boolean readCacheId,
+ RowData rowData,
+ boolean skipVer
+ ) throws IgniteCheckedException {
+ long nextLink;
+ int itemId0 = itemId;
+ ByteBuffer buff = pageBuff;
+ IncompleteObject<?> incomplete = null;
+
+ for (;;) {
+ long pageAddr = GridUnsafe.bufferAddress(buff);
+
+ incomplete = readIncomplete(incomplete, sctx, coctx, buff.capacity(), buff.capacity(),
+ pageAddr, itemId0, PageIO.getPageIO(T_DATA, PageIO.getVersion(buff)), rowData, readCacheId, skipVer);
+
+ if (incomplete == null)
+ break;
+
+ nextLink = incomplete.getNextLink();
+
+ if (nextLink == 0)
+ break;
+ else {
+ buff = reader.apply(pageId(nextLink));
+ itemId0 = itemId(nextLink);
+
+ assert itemId0 == 0 : "Only one item is possible on the fragmented page: " + PageIdUtils.toDetailString(nextLink);
+ }
+ }
+
+ assert isReady() : "Entry must has the 'ready' state, when the init ends";
+ }
+
+ /**
* @param io Data page IO.
* @param pageAddr Data page address.
* @param itemId Row item Id.
@@ -189,8 +244,8 @@ public class CacheDataRowAdapter implements CacheDataRow {
int grpId = grp != null ? grp.groupId() : 0;
IoStatisticsHolder statHolder = grp != null ? grp.statisticsHolderData() : IoStatisticsHolderNoOp.INSTANCE;
- IncompleteObject<?> incomplete = readIncomplete(null, sharedCtx, coctx, pageMem,
- grpId, pageAddr, itemId, io, rowData, readCacheId, skipVer);
+ IncompleteObject<?> incomplete = readIncomplete(null, sharedCtx, coctx, pageMem.pageSize(),
+ pageMem.realPageSize(grpId), pageAddr, itemId, io, rowData, readCacheId, skipVer);
if (incomplete != null) {
// Initialize the remaining part of the large row from other pages.
@@ -258,8 +313,8 @@ public class CacheDataRowAdapter implements CacheDataRow {
int itemId = itemId(nextLink);
- incomplete = readIncomplete(incomplete, sharedCtx, coctx, pageMem,
- grpId, pageAddr, itemId, io, rowData, readCacheId, skipVer);
+ incomplete = readIncomplete(incomplete, sharedCtx, coctx, pageMem.pageSize(),
+ pageMem.realPageSize(grpId), pageAddr, itemId, io, rowData, readCacheId, skipVer);
if (incomplete == null || (rowData == KEY_ONLY && key != null))
return;
@@ -299,8 +354,8 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param incomplete Incomplete object.
* @param sharedCtx Cache shared context.
* @param coctx Cache object context.
- * @param pageMem Page memory.
- * @param grpId Cache group Id.
+ * @param realPageSize Page size without overhead.
+ * @param pageSize Page size.
* @param pageAddr Page address.
* @param io Page IO.
* @param rowData Required row data.
@@ -309,12 +364,12 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @return Incomplete object.
* @throws IgniteCheckedException If failed.
*/
- private IncompleteObject<?> readIncomplete(
+ protected IncompleteObject<?> readIncomplete(
IncompleteObject<?> incomplete,
GridCacheSharedContext<?, ?> sharedCtx,
CacheObjectContext coctx,
- PageMemory pageMem,
- int grpId,
+ int pageSize,
+ int realPageSize,
long pageAddr,
int itemId,
DataPageIO io,
@@ -322,7 +377,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
boolean readCacheId,
boolean skipVer
) throws IgniteCheckedException {
- DataPagePayload data = io.readPayload(pageAddr, itemId, pageMem.realPageSize(grpId));
+ DataPagePayload data = io.readPayload(pageAddr, itemId, realPageSize);
long nextLink = data.nextLink();
@@ -343,7 +398,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
return null;
}
- ByteBuffer buf = pageMem.pageBuffer(pageAddr);
+ ByteBuffer buf = wrapPointer(pageAddr, pageSize);
int off = data.offset() + hdrLen;
int payloadSize = data.payloadSize() - hdrLen;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 755dc6f..7110dcd 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -1003,7 +1003,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
* @param dir Directory to check.
* @return Files that match cache or cache group pattern.
*/
- public static List<File> cacheDirectories(File dir) {
+ public static List<File> cacheDirectories(File dir, Predicate<String> names) {
File[] files = dir.listFiles();
if (files == null)
@@ -1013,6 +1013,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
.sorted()
.filter(File::isDirectory)
.filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+ .filter(f -> names.test(cacheGroupName(f)))
.collect(Collectors.toList());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index f59f535..d23b584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -37,12 +37,16 @@ import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -71,12 +75,17 @@ import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
@@ -88,22 +97,30 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.Metas
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import org.apache.ignite.internal.processors.cache.tree.DataRow;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.marshaller.MappedName;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.distributed.InitMessage;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -131,20 +148,32 @@ import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
import static org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
+import static org.apache.ignite.internal.MarshallerContextImpl.resolveMappingFileStoreWorkDir;
import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.pagemem.PageIdUtils.flag;
+import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagemem.PageIdUtils.pageIndex;
+import static org.apache.ignite.internal.pagemem.PageIdUtils.toDetailString;
import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectories;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.DB_DEFAULT_FOLDER;
import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.T_DATA;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getPageIO;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
import static org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT;
@@ -851,7 +880,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
if (!snpDir.exists())
return Collections.emptyList();
- return cacheDirectories(new File(snpDir, databaseRelativePath(folderName)));
+ return cacheDirectories(new File(snpDir, databaseRelativePath(folderName)), name -> true);
}
/**
@@ -1124,6 +1153,62 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
}
/**
+ * @param snpName Snapshot name.
+ * @param folderName The node folder name, usually it's the same as the U.maskForFileName(consistentId).
+ * @param grpName Cache group name.
+ * @param partId Partition id.
+ * @return Iterator over partition.
+ * @throws IgniteCheckedException If and error occurs.
+ */
+ public GridCloseableIterator<CacheDataRow> partitionRowIterator(String snpName,
+ String folderName,
+ String grpName,
+ int partId
+ ) throws IgniteCheckedException {
+ File snpDir = snapshotLocalDir(snpName);
+
+ if (!snpDir.exists())
+ throw new IgniteCheckedException("Snapshot directory doesn't exists: " + snpDir.getAbsolutePath());
+
+ File nodePath = new File(snpDir, databaseRelativePath(folderName));
+
+ if (!nodePath.exists())
+ throw new IgniteCheckedException("Consistent id directory doesn't exists: " + nodePath.getAbsolutePath());
+
+ List<File> grps = cacheDirectories(nodePath, name -> name.equals(grpName));
+
+ if (F.isEmpty(grps))
+ throw new IgniteCheckedException("The snapshot cache group not found [dir=" + snpDir.getAbsolutePath() + ", grpName=" + grpName + ']');
+
+ if (grps.size() > 1)
+ throw new IgniteCheckedException("The snapshot cache group directory cannot be uniquely identified [dir=" + snpDir.getAbsolutePath() + ", grpName=" + grpName + ']');
+
+ File snpPart = getPartitionFile(new File(snapshotLocalDir(snpName), databaseRelativePath(folderName)),
+ grps.get(0).getName(), partId);
+
+ FilePageStore pageStore = (FilePageStore)storeFactory
+ .apply(CU.cacheId(grpName), false)
+ .createPageStore(getTypeByPartId(partId),
+ snpPart::toPath,
+ val -> {
+ });
+
+ GridKernalContext kctx = new StandaloneGridKernalContext(log,
+ resolveBinaryWorkDir(snpDir.getAbsolutePath(), folderName),
+ resolveMappingFileStoreWorkDir(snpDir.getAbsolutePath()));
+
+ CacheObjectContext coctx = new CacheObjectContext(kctx, grpName, null, false,
+ false, false, false, false);
+
+ GridCacheSharedContext<?, ?> sctx = new GridCacheSharedContext<>(kctx, null, null, null,
+ null, null, null, null, null, null,
+ null, null, null, null, null,
+ null, null, null, null, null, null);
+
+ return new DataPageIterator(sctx, coctx, pageStore, partId);
+ }
+
+ /**
* @param snpName Unique snapshot name.
* @param srcNodeId Node id which cause snapshot operation.
* @param parts Collection of pairs group and appropriate cache partition to be snapshot.
@@ -1282,6 +1367,233 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
}
/**
+ * Ves pokrit assertami absolutely ves,
+ * PageScan iterator in the ignite core est.
+ */
+ private static class DataPageIterator extends GridCloseableIteratorAdapter<CacheDataRow> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Page store to iterate over. */
+ @GridToStringExclude
+ private final PageStore store;
+
+ /** Page store partition id. */
+ private final int partId;
+
+ /** Grid cache shared context. */
+ private final GridCacheSharedContext<?, ?> sctx;
+
+ /** Cache object context for key/value deserialization. */
+ private final CacheObjectContext coctx;
+
+ /** Buffer to read pages. */
+ private final ByteBuffer locBuff;
+
+ /** Buffer to read the rest part of fragmented rows. */
+ private final ByteBuffer fragmentBuff;
+
+ /** Total pages in the page store. */
+ private final int pages;
+
+ /**
+ * Data row greater than page size contains with header and tail parts. Such pages with tails contain only part
+ * of a cache key-value pair. These pages will be marked and skipped at the first partition iteration and
+ * will be processed on the second partition iteration when all the pages with key-value headers defined.
+ */
+ private final BitSet tailPages;
+
+ /** Pages which already read and must be skipped. */
+ private final BitSet readPages;
+
+ /** Batch of rows read through iteration. */
+ private final Deque<CacheDataRow> rows = new LinkedList<>();
+
+ /** {@code true} if the iteration though partition reached its end. */
+ private boolean secondScanComplete;
+
+ /**
+ * Current partition page index for read. Due to we read the partition twice it
+ * can't be greater that 2 * store.size().
+ */
+ private int currIdx;
+
+ /**
+ * During scanning a cache partition presented as {@code PageStore} we must guarantee the following:
+ * all the pages of this storage remains unchanged during the Iterator remains opened, the stored data
+ * keeps its consistency. We can't read the {@code PageStore} during an ongoing checkpoint over it.
+ *
+ * @param coctx Cache object context.
+ * @param store Page store to read.
+ * @param partId Partition id.
+ * @throws IgniteCheckedException If fails.
+ */
+ public DataPageIterator(
+ GridCacheSharedContext<?, ?> sctx,
+ CacheObjectContext coctx,
+ PageStore store,
+ int partId
+ ) throws IgniteCheckedException {
+ this.store = store;
+ this.partId = partId;
+ this.coctx = coctx;
+ this.sctx = sctx;
+
+ store.ensure();
+ pages = store.pages();
+ tailPages = new BitSet(pages);
+ readPages = new BitSet(pages);
+
+ locBuff = ByteBuffer.allocateDirect(store.getPageSize())
+ .order(ByteOrder.nativeOrder());
+ fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize())
+ .order(ByteOrder.nativeOrder());
+ }
+
+ /** {@inheritDoc */
+ @Override protected CacheDataRow onNext() throws IgniteCheckedException {
+ if (secondScanComplete && rows.isEmpty())
+ throw new NoSuchElementException("[partId=" + partId + ", store=" + store + ", skipPages=" + readPages + ']');
+
+ return rows.poll();
+ }
+
+ /** {@inheritDoc */
+ @Override protected boolean onHasNext() throws IgniteCheckedException {
+ if (secondScanComplete && rows.isEmpty())
+ return false;
+
+ try {
+ for (; currIdx < 2 * pages && rows.isEmpty(); currIdx++) {
+ boolean first = currIdx < pages;
+ int pageIdx = currIdx % pages;
+
+ if (readPages.get(pageIdx) || (!first && tailPages.get(pageIdx)))
+ continue;
+
+ if (!readPageFromStore(pageId(partId, FLAG_DATA, pageIdx), locBuff)) {
+ // Skip not FLAG_DATA pages.
+ setBit(readPages, pageIdx);
+
+ continue;
+ }
+
+ long pageAddr = bufferAddress(locBuff);
+ DataPageIO io = getPageIO(T_DATA, getVersion(pageAddr));
+ int freeSpace = io.getFreeSpace(pageAddr);
+ int rowsCnt = io.getDirectCount(pageAddr);
+
+ if (first) {
+ // Skip empty pages.
+ if (rowsCnt == 0) {
+ setBit(readPages, pageIdx);
+
+ continue;
+ }
+
+ // There is no difference between a page containing an incomplete DataRow fragment and
+ // the page where DataRow takes up all the free space. There is no a dedicated
+ // flag for this case in page header.
+ // During the storage scan we can skip such pages at the first iteration over the partition file,
+ // since all the fragmented pages will be marked by BitSet array we will safely read the others
+ // on the second iteration.
+ if (freeSpace == 0 && rowsCnt == 1) {
+ DataPagePayload payload = io.readPayload(pageAddr, 0, locBuff.capacity());
+
+ long link = payload.nextLink();
+
+ if (link != 0)
+ setBit(tailPages, pageIndex(pageId(link)));
+
+ continue;
+ }
+ }
+
+ setBit(readPages, pageIdx);
+
+ for (int itemId = 0; itemId < rowsCnt; itemId++) {
+ DataRow row = new DataRow();
+
+ row.partition(partId);
+
+ row.initFromPageBuffer(
+ sctx,
+ coctx,
+ new IgniteThrowableFunction<Long, ByteBuffer>() {
+ @Override public ByteBuffer apply(Long nextPageId) throws IgniteCheckedException {
+ boolean success = readPageFromStore(nextPageId, fragmentBuff);
+
+ assert success : "Only FLAG_DATA pages allowed: " + toDetailString(nextPageId);
+
+ // Fragment of page has been read, might be skipped further.
+ setBit(readPages, pageIndex(nextPageId));
+
+ return fragmentBuff;
+ }
+ },
+ locBuff,
+ itemId,
+ false,
+ CacheDataRowAdapter.RowData.FULL,
+ false);
+
+ rows.add(row);
+ }
+ }
+
+ if (currIdx == 2 * pages) {
+ secondScanComplete = true;
+
+ boolean set = true;
+
+ for (int j = 0; j < pages; j++)
+ set &= readPages.get(j);
+
+ assert set : "readPages=" + readPages + ", pages=" + pages;
+ }
+
+ return !rows.isEmpty();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Error during iteration through page store: " + this, e);
+ }
+ }
+
+ /**
+ * @param bitSet BitSet to change bit index.
+ * @param idx Index of bit to change.
+ */
+ private static void setBit(BitSet bitSet, int idx) {
+ boolean bit = bitSet.get(idx);
+
+ assert !bit : "Bit with given index already set: " + idx;
+
+ bitSet.set(idx);
+ }
+
+ /**
+ * @param pageId Page id to read from store.
+ * @param buff Buffer to read page into.
+ * @return {@code true} if page read with given type flag.
+ * @throws IgniteCheckedException If fails.
+ */
+ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteCheckedException {
+ buff.clear();
+
+ boolean read = store.read(pageId, buff, true);
+
+ assert read : toDetailString(pageId);
+
+ return getType(buff) == flag(pageId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DataPageIterator.class, this, super.toString());
+ }
+ }
+
+ /**
* Snapshot sender which writes all data to local directory.
*/
private class LocalSnapshotSender extends SnapshotSender {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
index c6a8a72..e717a45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -37,7 +38,6 @@ import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
@@ -63,9 +63,13 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
-import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
+import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
-/** */
+/**
+ * Task for checking snapshot partitions consistency the same way as {@link VerifyBackupPartitionsTaskV2} does.
+ * Since a snapshot partitions already stored apart on disk the is no requirement for a cluster upcoming updates
+ * to be hold on.
+ */
@GridInternal
public class SnapshotPartitionsVerifyTask
extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
@@ -207,7 +211,7 @@ public class SnapshotPartitionsVerifyTask
", meta=" + meta + ']');
}
- Map<PartitionKeyV2, PartitionHashRecordV2> res = new HashMap<>();
+ Map<PartitionKeyV2, PartitionHashRecordV2> res = new ConcurrentHashMap<>();
ThreadLocal<ByteBuffer> buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize())
.order(ByteOrder.nativeOrder()));
@@ -253,13 +257,21 @@ public class SnapshotPartitionsVerifyTask
+ ", size=" + size + "]");
}
- checkPartitionsPageCrcSum(() -> pageStore, partId, PageIdAllocator.FLAG_DATA);
-
// Snapshot partitions must always be in OWNING state.
// There is no `primary` partitions for snapshot.
- res.computeIfAbsent(new PartitionKeyV2(grpId, partId, grpName),
- key -> new PartitionHashRecordV2(key, false, consId,
- 0, updateCntr, size, PartitionHashRecordV2.PartitionState.OWNING));
+ PartitionKeyV2 key = new PartitionKeyV2(grpId, partId, grpName);
+
+ PartitionHashRecordV2 hash = calculatePartitionHash(key,
+ updateCntr,
+ consId,
+ GridDhtPartitionState.OWNING,
+ false,
+ size,
+ snpMgr.partitionRowIterator(snpName, meta.folderName(), grpName, partId));
+
+ assert hash != null : "OWNING must have hash: " + key;
+
+ res.put(key, hash);
}
}
catch (IOException e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java
index 55690b8..20e43ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java
@@ -371,7 +371,7 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO impl
* @param pageAddr Page address.
* @return Direct count.
*/
- private int getDirectCount(long pageAddr) {
+ public int getDirectCount(long pageAddr) {
return PageUtils.getByte(pageAddr, DIRECT_CNT_OFF) & 0xFF;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index add2abe..f8d2ac0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -110,6 +110,13 @@ public class DataRow extends CacheDataRowAdapter {
return part;
}
+ /**
+ * @param partId Partition id.
+ */
+ public void partition(int partId) {
+ part = partId;
+ }
+
/** {@inheritDoc} */
@Override public int hash() {
return hash;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
index 637306c..38e196b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
@@ -22,6 +22,7 @@ import java.io.ObjectOutput;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Consumer;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -285,6 +286,26 @@ public class IdleVerifyResultV2 extends VisorDataTransferObject {
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ IdleVerifyResultV2 v2 = (IdleVerifyResultV2)o;
+
+ return Objects.equals(cntrConflicts, v2.cntrConflicts) && Objects.equals(hashConflicts, v2.hashConflicts) &&
+ Objects.equals(movingPartitions, v2.movingPartitions) && Objects.equals(lostPartitions, v2.lostPartitions) &&
+ Objects.equals(exceptions, v2.exceptions);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(cntrConflicts, hashConflicts, movingPartitions, lostPartitions, exceptions);
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(IdleVerifyResultV2.class, this);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
index f38fd93..330b565 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
@@ -21,11 +21,13 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
@@ -36,7 +38,9 @@ import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -224,6 +228,56 @@ public class IdleVerifyUtility {
}
/**
+ * @param partKey Partition key.
+ * @param updCntr Partition update counter prior check.
+ * @param consId Local node consistent id.
+ * @param state Partition state to check.
+ * @param isPrimary {@code true} if partition is primary.
+ * @param partSize Partition size on disk.
+ * @param it Iterator though partition data rows.
+ * @throws IgniteCheckedException If fails.
+ * @return Map of calculated partition.
+ */
+ public static @Nullable PartitionHashRecordV2 calculatePartitionHash(
+ PartitionKeyV2 partKey,
+ long updCntr,
+ Object consId,
+ GridDhtPartitionState state,
+ boolean isPrimary,
+ long partSize,
+ GridIterator<CacheDataRow> it
+ ) throws IgniteCheckedException {
+ if (state == GridDhtPartitionState.MOVING || state == GridDhtPartitionState.LOST) {
+ return new PartitionHashRecordV2(partKey,
+ isPrimary,
+ consId,
+ 0,
+ updCntr,
+ state == GridDhtPartitionState.MOVING ?
+ PartitionHashRecordV2.MOVING_PARTITION_SIZE : 0,
+ state == GridDhtPartitionState.MOVING ?
+ PartitionHashRecordV2.PartitionState.MOVING : PartitionHashRecordV2.PartitionState.LOST);
+ }
+
+ if (state != GridDhtPartitionState.OWNING)
+ return null;
+
+ int partHash = 0;
+
+ while (it.hasNextX()) {
+ CacheDataRow row = it.nextX();
+
+ partHash += row.key().hashCode();
+
+ // Object context is not required since the valueBytes have been read directly from page.
+ partHash += Arrays.hashCode(row.value().valueBytes(null));
+ }
+
+ return new PartitionHashRecordV2(partKey, isPrimary, consId, partHash, updCntr,
+ partSize, PartitionHashRecordV2.PartitionState.OWNING);
+ }
+
+ /**
* Idle checker.
*/
public static class IdleChecker implements IgniteInClosure<Integer> {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
index ce1d1c3..810f46b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache.verify;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-
+import java.util.Objects;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -187,17 +187,19 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
@Override public boolean equals(Object o) {
if (this == o)
return true;
+
if (o == null || getClass() != o.getClass())
return false;
- PartitionHashRecordV2 record = (PartitionHashRecordV2)o;
+ PartitionHashRecordV2 v2 = (PartitionHashRecordV2)o;
- return consistentId.equals(record.consistentId);
+ return partHash == v2.partHash && updateCntr == v2.updateCntr && size == v2.size && partKey.equals(v2.partKey) &&
+ consistentId.equals(v2.consistentId) && partitionState == v2.partitionState;
}
/** {@inheritDoc} */
@Override public int hashCode() {
- return consistentId.hashCode();
+ return Objects.hash(partKey, consistentId, partHash, updateCntr, size, partitionState);
}
/** **/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
index 08f5b2b..6218814 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.verify;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -51,13 +50,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2.PartitionState;
import org.apache.ignite.internal.processors.task.GridInternal;
-import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg;
@@ -72,6 +68,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.GRID_NOT_IDLE_MSG;
+import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
/**
@@ -504,105 +501,67 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
}
/**
- * @param grpCtx Group context.
+ * @param gctx Group context.
* @param part Local partition.
*/
private Future<Map<PartitionKeyV2, PartitionHashRecordV2>> calculatePartitionHashAsync(
- final CacheGroupContext grpCtx,
+ final CacheGroupContext gctx,
final GridDhtLocalPartition part
) {
- return ForkJoinPool.commonPool().submit(() -> calculatePartitionHash(grpCtx, part));
- }
-
- /**
- * @param grpCtx Group context.
- * @param part Local partition.
- */
- private Map<PartitionKeyV2, PartitionHashRecordV2> calculatePartitionHash(
- CacheGroupContext grpCtx,
- GridDhtLocalPartition part
- ) {
- if (!part.reserve())
- return emptyMap();
-
- int partHash = 0;
- long partSize;
+ return ForkJoinPool.commonPool().submit(() -> {
+ Map<PartitionKeyV2, PartitionHashRecordV2> res = emptyMap();
- @Nullable PartitionUpdateCounter updCntr = part.dataStore().partUpdateCounter();
+ if (!part.reserve())
+ return res;
- PartitionUpdateCounter updateCntrBefore = updCntr == null ? null : updCntr.copy();
+ try {
+ PartitionUpdateCounter updCntr = part.dataStore().partUpdateCounter();
+ PartitionUpdateCounter updateCntrBefore = updCntr == null ? null : updCntr.copy();
- PartitionKeyV2 partKey = new PartitionKeyV2(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName());
+ if (arg.checkCrc() && gctx.persistenceEnabled()) {
+ FilePageStoreManager pageStoreMgr =
+ (FilePageStoreManager)ignite.context().cache().context().pageStore();
- Object consId = ignite.context().discovery().localNode().consistentId();
+ checkPartitionsPageCrcSum(() -> (FilePageStore)pageStoreMgr.getStore(gctx.groupId(), part.id()),
+ part.id(), FLAG_DATA);
+ }
- boolean isPrimary = part.primary(grpCtx.topology().readyTopologyVersion());
+ PartitionKeyV2 key = new PartitionKeyV2(gctx.groupId(), part.id(), gctx.cacheOrGroupName());
- try {
- if (part.state() == GridDhtPartitionState.MOVING || part.state() == GridDhtPartitionState.LOST) {
- PartitionHashRecordV2 movingHashRecord = new PartitionHashRecordV2(
- partKey,
- isPrimary,
- consId,
- partHash,
+ PartitionHashRecordV2 hash = calculatePartitionHash(key,
updateCntrBefore == null ? 0 : updateCntrBefore.get(),
- part.state() == GridDhtPartitionState.MOVING ? PartitionHashRecordV2.MOVING_PARTITION_SIZE : 0,
- part.state() == GridDhtPartitionState.MOVING ? PartitionState.MOVING : PartitionState.LOST
- );
-
- return Collections.singletonMap(partKey, movingHashRecord);
- }
-
- if (part.state() != GridDhtPartitionState.OWNING)
- return emptyMap();
+ ignite.context().discovery().localNode().consistentId(),
+ part.state(),
+ part.primary(gctx.topology().readyTopologyVersion()),
+ part.dataStore().fullSize(),
+ gctx.offheap().partitionIterator(part.id()));
- partSize = part.dataStore().fullSize();
+ if (hash != null)
+ res = Collections.singletonMap(key, hash);
- if (arg.checkCrc() && grpCtx.persistenceEnabled()) {
- FilePageStoreManager pageStoreMgr =
- (FilePageStoreManager)ignite.context().cache().context().pageStore();
+ PartitionUpdateCounter updateCntrAfter = part.dataStore().partUpdateCounter();
- checkPartitionsPageCrcSum(() -> (FilePageStore)pageStoreMgr.getStore(grpCtx.groupId(), part.id()),
- part.id(), FLAG_DATA);
+ if (updateCntrAfter != null && !updateCntrAfter.equals(updateCntrBefore)) {
+ throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" + gctx.cacheOrGroupName() +
+ ", grpId=" + gctx.groupId() + ", partId=" + part.id() + "] changed during size " +
+ "calculation [updCntrBefore=" + updateCntrBefore + ", updCntrAfter=" + updateCntrAfter + "]");
+ }
}
+ catch (IgniteCheckedException e) {
+ U.error(log, "Can't calculate partition hash [grpId=" + gctx.groupId() +
+ ", partId=" + part.id() + "]", e);
- GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id());
-
- while (it.hasNextX()) {
- CacheDataRow row = it.nextX();
-
- partHash += row.key().hashCode();
-
- partHash += Arrays.hashCode(row.value().valueBytes(grpCtx.cacheObjectContext()));
+ throw new IgniteException("Can't calculate partition hash [grpId=" + gctx.groupId() +
+ ", partId=" + part.id() + "]", e);
}
-
- PartitionUpdateCounter updateCntrAfter = part.dataStore().partUpdateCounter();
-
- if (updateCntrAfter != null && !updateCntrAfter.equals(updateCntrBefore)) {
- throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" + grpCtx.cacheOrGroupName() +
- ", grpId=" + grpCtx.groupId() + ", partId=" + part.id() + "] changed during size " +
- "calculation [updCntrBefore=" + updateCntrBefore + ", updCntrAfter=" + updateCntrAfter + "]");
+ finally {
+ part.release();
}
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Can't calculate partition hash [grpId=" + grpCtx.groupId() +
- ", partId=" + part.id() + "]", e);
-
- throw new IgniteException("Can't calculate partition hash [grpId=" + grpCtx.groupId() +
- ", partId=" + part.id() + "]", e);
- }
- finally {
- part.release();
- }
-
- PartitionHashRecordV2 partRec = new PartitionHashRecordV2(
- partKey, isPrimary, consId, partHash, updateCntrBefore == null ? 0 : updateCntrBefore.get(), partSize,
- PartitionState.OWNING
- );
- completionCntr.incrementAndGet();
+ completionCntr.incrementAndGet();
- return Collections.singletonMap(partKey, partRec);
+ return res;
+ });
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/security/SecurityPermission.java b/modules/core/src/main/java/org/apache/ignite/plugin/security/SecurityPermission.java
index 2d2f15a..b45a087 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/security/SecurityPermission.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/security/SecurityPermission.java
@@ -84,7 +84,7 @@ public enum SecurityPermission {
/** Administration operation: write distributed properties values. */
ADMIN_WRITE_DISTRIBUTED_PROPERTY,
- /** Administration operation with cluster snapshots (CREATE, CANCEL). */
+ /** Administration operation with cluster snapshots (create, cancel, check). */
ADMIN_SNAPSHOT;
/** Enumerated values. */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
index 63fea44..e70651a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
@@ -35,7 +35,6 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
@@ -53,6 +52,7 @@ import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog.TX_LOG_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.META_STORAGE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
@@ -349,7 +349,7 @@ public class IgnitePdsDataRegionMetricsTest extends GridCommonAbstractTest {
File file = path.toFile();
FilePageStore store = (FilePageStore)pageStoreMgr.getStore(metaStore ?
- METASTORAGE_CACHE_ID : CU.cacheId(cacheName), partId(file));
+ METASTORAGE_CACHE_ID : CU.cacheId(cacheName), partId(file.getName()));
int pageSize = store.getPageSize();
long storeSize = path.toFile().length() - store.headerSize();
@@ -372,19 +372,4 @@ public class IgnitePdsDataRegionMetricsTest extends GridCommonAbstractTest {
assertEquals("Number of allocated pages is different than in metrics for [node=" + node.name() + ", cache=" + cacheName + "]",
totalPersistenceSize / pageStoreMgr.pageSize(), totalAllocatedPagesFromMetrics);
}
-
- /**
- * @param partFile Partition file.
- */
- private static int partId(File partFile) {
- String name = partFile.getName();
-
- if (name.equals(FilePageStoreManager.INDEX_FILE_NAME))
- return PageIdAllocator.INDEX_PARTITION;
-
- if (name.startsWith(FilePageStoreManager.PART_FILE_PREFIX))
- return Integer.parseInt(name.substring(FilePageStoreManager.PART_FILE_PREFIX.length(), name.indexOf('.')));
-
- throw new IllegalStateException("Illegal partition file name: " + name);
- }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 889cb18..809a23d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -82,6 +82,7 @@ import org.junit.Before;
import static java.nio.file.Files.newDirectoryStream;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
@@ -127,7 +128,7 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
.setMaxSize(100L * 1024 * 1024)
.setPersistenceEnabled(persistence))
.setCheckpointFrequency(3000)
- .setPageSize(4096))
+ .setPageSize(DFLT_PAGE_SIZE))
.setCacheConfiguration(dfltCacheCfg)
.setClusterStateOnStart(INACTIVE)
.setIncludeEventTypes(EVTS_CLUSTER_SNAPSHOT)
@@ -495,6 +496,24 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
}
/** */
+ protected static class Value {
+ /** */
+ private final byte[] arr;
+
+ /**
+ * @param arr Test array.
+ */
+ public Value(byte[] arr) {
+ this.arr = arr;
+ }
+
+ /** */
+ public byte[] arr() {
+ return arr;
+ }
+ }
+
+ /** */
protected static class DelegateSnapshotSender extends SnapshotSender {
/** Delegate call to. */
protected final SnapshotSender delegate;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index f2b1da9..23e18f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -22,35 +22,68 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridJobExecuteRequest;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.verify.CacheFilterEnum;
+import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Before;
import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName;
import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METAFILE_EXT;
+import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -58,6 +91,18 @@ import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
* Cluster-wide snapshot check procedure tests.
*/
public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
+ /** Map of intermediate compute task results collected prior performing reduce operation on them. */
+ private final Map<Class<?>, Map<PartitionKeyV2, List<PartitionHashRecordV2>>> jobResults = new ConcurrentHashMap<>();
+
+ /** Partition id used for tests. */
+ private static final int PART_ID = 0;
+
+ /** Cleanup data of task execution results if need. */
+ @Before
+ public void beforeCheck() {
+ jobResults.clear();
+ }
+
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotCheck() throws Exception {
@@ -183,14 +228,14 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
Path part0 = U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
- getPartitionFileName(0));
+ getPartitionFileName(PART_ID));
assertNotNull(part0);
assertTrue(part0.toString(), part0.toFile().exists());
try (FilePageStore pageStore = (FilePageStore)((FilePageStoreManager)ignite.context().cache().context().pageStore())
.getPageStoreFactory(CU.cacheId(dfltCacheCfg.getName()), false)
- .createPageStore(getTypeByPartId(0),
+ .createPageStore(getTypeByPartId(PART_ID),
() -> part0,
val -> {
})
@@ -220,7 +265,8 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
res.print(b::append, true);
assertTrue(F.isEmpty(res.exceptions()));
- assertContains(log, b.toString(), "The check procedure has finished, found 1 conflict partitions");
+ assertContains(log, b.toString(),
+ "The check procedure has finished, found 1 conflict partitions: [counterConflicts=1, hashConflicts=0]");
}
/** @throws Exception If fails. */
@@ -284,11 +330,11 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
Path part0 = U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
- getPartitionFileName(0));
+ getPartitionFileName(PART_ID));
try (FilePageStore pageStore = (FilePageStore)((FilePageStoreManager)ignite.context().cache().context().pageStore())
.getPageStoreFactory(CU.cacheId(dfltCacheCfg.getName()), false)
- .createPageStore(getTypeByPartId(0),
+ .createPageStore(getTypeByPartId(PART_ID),
() -> part0,
val -> {
})
@@ -317,4 +363,178 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
Exception ex = res.exceptions().values().iterator().next();
assertTrue(X.hasCause(ex, IgniteDataIntegrityViolationException.class));
}
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckFailsOnPartitionDataDiffers() throws Exception {
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ ignite.getOrCreateCache(ccfg).put(1, new Value(new byte[2000]));
+
+ forceCheckpoint(ignite);
+
+ GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+ GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database();
+
+ BinaryContext binCtx = ((CacheObjectBinaryProcessorImpl)ignite.context().cacheObjects()).binaryContext();
+
+ GridCacheAdapter<?, ?> cache = ignite.context().cache().internalCache(dfltCacheCfg.getName());
+ long partCtr = cache.context().offheap().lastUpdatedPartitionCounter(PART_ID);
+ AtomicBoolean done = new AtomicBoolean();
+
+ db.addCheckpointListener(new CheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
+ // Change the cache value only at on of the cluster node to get hash conflict when the check command ends.
+ if (!done.compareAndSet(false, true))
+ return;
+
+ GridIterator<CacheDataRow> it = cache.context().offheap().partitionIterator(PART_ID);
+
+ assertTrue(it.hasNext());
+
+ CacheDataRow row0 = it.nextX();
+
+ AffinityTopologyVersion topVer = cctx.exchange().readyAffinityVersion();
+ GridCacheEntryEx cached = cache.entryEx(row0.key(), topVer);
+
+ byte[] bytes = new byte[2000];
+ new Random().nextBytes(bytes);
+
+ try {
+ BinaryObjectImpl newVal = new BinaryObjectImpl(binCtx, binCtx.marshaller().marshal(new Value(bytes)), 0);
+
+ boolean success = cached.initialValue(
+ newVal,
+ new GridCacheVersion(row0.version().topologyVersion(),
+ row0.version().nodeOrder(),
+ row0.version().order() + 1),
+ null,
+ null,
+ TxState.NA,
+ TxState.NA,
+ TTL_ETERNAL,
+ row0.expireTime(),
+ true,
+ topVer,
+ DR_NONE,
+ false,
+ null);
+
+ assertTrue(success);
+
+ long newPartCtr = cache.context().offheap().lastUpdatedPartitionCounter(PART_ID);
+
+ assertEquals(newPartCtr, partCtr);
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+
+ }
+
+ @Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException {
+
+ }
+ });
+
+ db.waitForCheckpoint("test-checkpoint");
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Path part0 = U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+ getPartitionFileName(PART_ID));
+
+ assertNotNull(part0);
+ assertTrue(part0.toString(), part0.toFile().exists());
+
+ IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+ StringBuilder b = new StringBuilder();
+ res.print(b::append, true);
+
+ assertTrue(F.isEmpty(res.exceptions()));
+ assertContains(log, b.toString(),
+ "The check procedure has finished, found 1 conflict partitions: [counterConflicts=0, hashConflicts=1]");
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckHashesSameAsIdleVerifyHashes() throws Exception {
+ Random rnd = new Random();
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new Value(new byte[rnd.nextInt(32768)]), ccfg);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ IdleVerifyResultV2 idleVerifyRes = ignite.compute().execute(new TestVisorBackupPartitionsTask(),
+ new VisorIdleVerifyTaskArg(new HashSet<>(Collections.singletonList(ccfg.getName())),
+ new HashSet<>(),
+ false,
+ CacheFilterEnum.USER,
+ true));
+
+ IdleVerifyResultV2 snpVerifyRes = ignite.compute().execute(new TestSnapshotPartitionsVerifyTask(),
+ Collections.singletonMap(ignite.cluster().localNode(),
+ Collections.singletonList(snp(ignite).readSnapshotMetadata(SNAPSHOT_NAME, (String)ignite.configuration().getConsistentId()))));
+
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> idleVerifyHashes = jobResults.get(TestVisorBackupPartitionsTask.class);
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> snpCheckHashes = jobResults.get(TestVisorBackupPartitionsTask.class);
+
+ assertFalse(F.isEmpty(idleVerifyHashes));
+ assertFalse(F.isEmpty(snpCheckHashes));
+
+ assertEquals(idleVerifyHashes, snpCheckHashes);
+ assertEquals(idleVerifyRes, snpVerifyRes);
+ }
+
+ /**
+ * @param cls Class of running task.
+ * @param results Results of compute.
+ */
+ private void saveHashes(Class<?> cls, List<ComputeJobResult> results) {
+ Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashes = new HashMap<>();
+
+ for (ComputeJobResult job : results) {
+ if (job.getException() != null)
+ continue;
+
+ job.<Map<PartitionKeyV2, PartitionHashRecordV2>>getData().forEach((k, v) ->
+ hashes.computeIfAbsent(k, k0 -> new ArrayList<>()).add(v));
+ }
+
+ Object mustBeNull = jobResults.putIfAbsent(cls, hashes);
+
+ assertNull(mustBeNull);
+ }
+
+ /** */
+ private class TestVisorBackupPartitionsTask extends VerifyBackupPartitionsTaskV2 {
+ /** {@inheritDoc} */
+ @Override public @Nullable IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
+ IdleVerifyResultV2 res = super.reduce(results);
+
+ saveHashes(TestVisorBackupPartitionsTask.class, results);
+
+ return res;
+ }
+ }
+
+ /** Test compute task to collect partition data hashes when the snapshot check procedure ends. */
+ private class TestSnapshotPartitionsVerifyTask extends SnapshotPartitionsVerifyTask {
+ /** {@inheritDoc} */
+ @Override public @Nullable IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
+ IdleVerifyResultV2 res = super.reduce(results);
+
+ saveHashes(TestSnapshotPartitionsVerifyTask.class, results);
+
+ return res;
+ }
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
index d91c6ac..d0a943d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
@@ -25,15 +25,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
+import java.util.function.BiFunction;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -41,8 +42,9 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
@@ -50,26 +52,33 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileVersionCheckingFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.CP_SNAPSHOT_REASON;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
/**
* Default snapshot manager test.
*/
public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
+ /** The size of value array to fit 3 pages. */
+ private static final int SIZE_FOR_FIT_3_PAGES = 12008;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -388,31 +397,150 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
snpFut.get(5_000, TimeUnit.MILLISECONDS);
}
- /**
- * @param src Source node to calculate.
- * @param grps Groups to collect owning parts.
- * @param rmtNodeId Remote node id.
- * @return Map of collected parts.
- */
- private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, Set<Integer> grps, UUID rmtNodeId) {
- Map<Integer, Set<Integer>> result = new HashMap<>();
-
- for (Integer grpId : grps) {
- Set<Integer> parts = src.context()
- .cache()
- .cacheGroup(grpId)
- .topology()
- .partitions(rmtNodeId)
- .entrySet()
- .stream()
- .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- result.put(grpId, parts);
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorRandomizedLoader() throws Exception {
+ Random rnd = new Random();
+ int maxKey = 15_000;
+ int maxValSize = 32_768;
+ int loadingTimeMs = 30_000;
+
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new CacheConfiguration<Integer, Value>("tx1"))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithCache(1, CACHE_KEYS_RANGE, k -> new Value(new byte[1024]), ccfg);
+
+ IgniteCache<Integer, Value> cache = ignite.cache(ccfg.getName());
+
+ long startTime = U.currentTimeMillis();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+ while (!Thread.currentThread().isInterrupted() && startTime + loadingTimeMs > U.currentTimeMillis()) {
+ if (rnd.nextBoolean())
+ cache.put(rnd.nextInt(maxKey), new Value(new byte[rnd.nextInt(maxValSize)]));
+ else
+ cache.remove(rnd.nextInt(maxKey));
+ }
+
+ }, 10, "change-loader-");
+
+ fut.get();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ Map<Integer, Value> iterated = new HashMap<>();
+
+ try (GridCloseableIterator<CacheDataRow> iter = snp(ignite).partitionRowIterator(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ ccfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx = ignite.cachex(ccfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ iterated.put(row.key().value(coctx, true), row.value().value(coctx, true));
+ }
}
- return result;
+ stopAllGrids();
+
+ IgniteEx snpIgnite = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+ IgniteCache<Integer, Value> snpCache = snpIgnite.cache(ccfg.getName());
+
+ assertEquals(snpCache.size(CachePeekMode.PRIMARY), iterated.size());
+ snpCache.forEach(e -> {
+ Value val = iterated.remove(e.getKey());
+
+ assertNotNull(val);
+ assertEquals(val.arr().length, e.getValue().arr().length);
+ });
+
+ assertTrue(iterated.isEmpty());
+ }
+
+ /** @throws Exception If fails */
+ @Test
+ public void testSnapshotIterator() throws Exception {
+ int keys = 127;
+
+ IgniteEx ignite = startGridsWithCache(2,
+ dfltCacheCfg.setAffinity(new RendezvousAffinityFunction(false, 1)), keys);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter = snp(ignite).partitionRowIterator(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx = ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ // Invariant for cache: cache key always equals to cache value.
+ assertEquals("Invalid key/value pair [key=" + row.key() + ", val=" + row.value() + ']',
+ row.key().value(coctx, false, U.resolveClassLoader(ignite.configuration())),
+ (Integer)row.value().value(coctx, false));
+
+ rows++;
+ }
+ }
+
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotIteratorLargeRows() throws Exception {
+ int keys = 2;
+ CacheConfiguration<Integer, Value> ccfg = txCacheConfig(new CacheConfiguration<Integer, Value>(DEFAULT_CACHE_NAME))
+ .setAffinity(new RendezvousAffinityFunction(false, 1));
+
+ IgniteEx ignite = startGridsWithoutCache(2);
+
+ assertEquals(DFLT_PAGE_SIZE, ignite.configuration().getDataStorageConfiguration().getPageSize());
+
+ for (int i = 0; i < keys; i++)
+ ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[SIZE_FOR_FIT_3_PAGES]));
+
+ forceCheckpoint();
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ int rows = 0;
+
+ try (GridCloseableIterator<CacheDataRow> iter = snp(ignite).partitionRowIterator(SNAPSHOT_NAME,
+ ignite.context().pdsFolderResolver().resolveFolders().folderName(),
+ dfltCacheCfg.getName(),
+ 0)
+ ) {
+ CacheObjectContext coctx = ignite.cachex(dfltCacheCfg.getName()).context().cacheObjectContext();
+
+ while (iter.hasNext()) {
+ CacheDataRow row = iter.next();
+
+ assertEquals(SIZE_FOR_FIT_3_PAGES, ((Value)row.value().value(coctx, false)).arr().length);
+ assertTrue((Integer)row.key().value(coctx, false, null) < 2);
+
+ rows++;
+ }
+ }
+
+ assertEquals("Invalid number of rows: " + rows, keys, rows);
+ }
+
+ /**
+ * @param ignite Ignite instance to set factory.
+ * @param factory New factory to use.
+ */
+ private static void snapshotStoreFactory(IgniteEx ignite, BiFunction<Integer, Boolean, FileVersionCheckingFactory> factory) {
+ setFieldValue(snp(ignite), "storeFactory", factory);
}
/**