You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/08/04 09:14:58 UTC
[ignite-3] branch main updated: IGNITE-16657 [Native Persistence 3.0] Implement partitions chunks merger (#972)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d05837ad1 IGNITE-16657 [Native Persistence 3.0] Implement partitions chunks merger (#972)
d05837ad1 is described below
commit d05837ad1dcb5ce5e0773714c7d0ebb8b067c287
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Aug 4 12:14:53 2022 +0300
IGNITE-16657 [Native Persistence 3.0] Implement partitions chunks merger (#972)
---
.../PageMemoryCheckpointConfigurationSchema.java | 7 +-
.../persistence/checkpoint/CheckpointManager.java | 28 +-
.../checkpoint/CheckpointPagesWriter.java | 12 +-
.../checkpoint/CheckpointPagesWriterFactory.java | 6 +-
.../persistence/checkpoint/Checkpointer.java | 19 +-
.../persistence/compaction/Compactor.java | 370 +++++++++++++++++++++
.../persistence/store/AbstractFilePageStoreIo.java | 10 +-
.../persistence/store/DeltaFilePageStoreIo.java | 62 +++-
.../persistence/store/FilePageStore.java | 59 +++-
.../persistence/store/FilePageStoreIo.java | 7 +
.../persistence/store/FilePageStoreManager.java | 48 ++-
.../store/PageReadWriteManagerImpl.java | 96 ------
.../pagememory/persistence/store/PageStore.java | 3 +-
.../persistence/checkpoint/CheckpointerTest.java | 13 +-
.../persistence/compaction/CompactorTest.java | 206 ++++++++++++
.../store/AbstractFilePageStoreIoTest.java | 2 +-
.../store/DeltaFilePageStoreIoTest.java | 60 ++++
.../store/FilePageStoreManagerTest.java | 19 ++
.../persistence/store/FilePageStoreTest.java | 92 +++++
.../store/PageReadWriteManagerImplTest.java | 84 -----
.../PersistentPageMemoryTableStorage.java | 6 +
21 files changed, 981 insertions(+), 228 deletions(-)
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
index b80a18a30..2352b360a 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
@@ -39,7 +39,12 @@ public class PageMemoryCheckpointConfigurationSchema {
/** Number of checkpoint threads. */
@Range(min = 1)
@Value(hasDefault = true)
- public int threads = 4;
+ public int checkpointThreads = 4;
+
+ /** Number of threads to compact delta files. */
+ @Range(min = 1)
+ @Value(hasDefault = true)
+ public int compactionThreads = 4;
/** Timeout for checkpoint read lock acquisition in milliseconds. */
@Range(min = 0)
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index d94782e49..57e764058 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
+import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor;
import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
@@ -76,6 +77,9 @@ public class CheckpointManager {
/** File page store manager. */
private final FilePageStoreManager filePageStoreManager;
+ /** Delta file compactor. */
+ private final Compactor compactor;
+
/**
* Constructor.
*
@@ -133,6 +137,15 @@ public class CheckpointManager {
pageSize
);
+ compactor = new Compactor(
+ Loggers.forClass(Compactor.class),
+ igniteInstanceName,
+ workerListener,
+ checkpointConfig.compactionThreads(),
+ filePageStoreManager,
+ pageSize
+ );
+
checkpointer = new Checkpointer(
Loggers.forClass(Checkpoint.class),
igniteInstanceName,
@@ -141,6 +154,7 @@ public class CheckpointManager {
checkpointWorkflow,
checkpointPagesWriterFactory,
filePageStoreManager,
+ compactor,
checkpointConfig
);
@@ -162,6 +176,8 @@ public class CheckpointManager {
checkpointer.start();
checkpointTimeoutLock.start();
+
+ compactor.start();
}
/**
@@ -171,7 +187,8 @@ public class CheckpointManager {
IgniteUtils.closeAll(
checkpointTimeoutLock::stop,
checkpointer::stop,
- checkpointWorkflow::stop
+ checkpointWorkflow::stop,
+ compactor::stop
);
}
@@ -285,4 +302,13 @@ public class CheckpointManager {
return pageIndexes;
}
+
+ /**
+ * Adds the number of delta files to compact.
+ *
+ * @param count Number of delta files.
+ */
+ public void addDeltaFileCountForCompaction(int count) {
+ compactor.addDeltaFiles(count);
+ }
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index 6da756a10..29adbde0f 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -75,8 +75,8 @@ public class CheckpointPagesWriter implements Runnable {
/** Future which should be finished when all pages would be written. */
private final CompletableFuture<?> doneFut;
- /** Some action which will be executed every time before page will be written. */
- private final Runnable beforePageWrite;
+ /** Update heartbeat callback. */
+ private final Runnable updateHeartbeat;
/** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */
private final ThreadLocal<ByteBuffer> threadBuf;
@@ -103,7 +103,7 @@ public class CheckpointPagesWriter implements Runnable {
* @param writePageIds Queue of dirty page IDs to write.
* @param updatedPartitions Updated partitions.
* @param doneFut Done future.
- * @param beforePageWrite Action to be performed before every page write.
+ * @param updateHeartbeat Update heartbeat callback.
* @param log Logger.
* @param threadBuf Thread local byte buffer.
* @param checkpointProgress Checkpoint progress.
@@ -118,7 +118,7 @@ public class CheckpointPagesWriter implements Runnable {
IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> writePageIds,
ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
CompletableFuture<?> doneFut,
- Runnable beforePageWrite,
+ Runnable updateHeartbeat,
ThreadLocal<ByteBuffer> threadBuf,
CheckpointProgressImpl checkpointProgress,
WriteDirtyPage pageWriter,
@@ -131,7 +131,7 @@ public class CheckpointPagesWriter implements Runnable {
this.writePageIds = writePageIds;
this.updatedPartitions = updatedPartitions;
this.doneFut = doneFut;
- this.beforePageWrite = beforePageWrite;
+ this.updateHeartbeat = updateHeartbeat;
this.threadBuf = threadBuf;
this.checkpointProgress = checkpointProgress;
this.pageWriter = pageWriter;
@@ -183,7 +183,7 @@ public class CheckpointPagesWriter implements Runnable {
AtomicBoolean writeMetaPage = new AtomicBoolean();
while (!shutdownNow.getAsBoolean() && writePageIds.next(queueResult)) {
- beforePageWrite.run();
+ updateHeartbeat.run();
FullPageId fullId = queueResult.getValue();
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
index 831cbc0e4..ee8d15b20 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -91,7 +91,7 @@ public class CheckpointPagesWriterFactory {
* @param dirtyPageIdQueue Checkpoint dirty page ID queue to write.
* @param updatedPartitions Updated partitions.
* @param doneWriteFut Write done future.
- * @param beforePageWrite Before page write callback.
+ * @param updateHeartbeat Update heartbeat callback.
* @param checkpointProgress Current checkpoint data.
* @param shutdownNow Checker of stop operation.
*/
@@ -100,7 +100,7 @@ public class CheckpointPagesWriterFactory {
IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> dirtyPageIdQueue,
ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
CompletableFuture<?> doneWriteFut,
- Runnable beforePageWrite,
+ Runnable updateHeartbeat,
CheckpointProgressImpl checkpointProgress,
// TODO: IGNITE-16993 Consider a lock replacement
BooleanSupplier shutdownNow
@@ -111,7 +111,7 @@ public class CheckpointPagesWriterFactory {
dirtyPageIdQueue,
updatedPartitions,
doneWriteFut,
- beforePageWrite,
+ updateHeartbeat,
threadBuf,
checkpointProgress,
dirtyPageWriter,
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index f8569013a..1725816dd 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.LongAdder;
@@ -48,6 +47,7 @@ import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryChec
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor;
import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
@@ -139,6 +139,9 @@ public class Checkpointer extends IgniteWorker {
/** File page store manager. */
private final FilePageStoreManager filePageStoreManager;
+ /** Delta file compactor. */
+ private final Compactor compactor;
+
/**
* Constructor.
*
@@ -149,6 +152,7 @@ public class Checkpointer extends IgniteWorker {
* @param checkpointWorkFlow Implementation of checkpoint.
* @param factory Page writer factory.
* @param filePageStoreManager File page store manager.
+ * @param compactor Delta file compactor.
* @param checkpointConfig Checkpoint configuration.
*/
Checkpointer(
@@ -159,6 +163,7 @@ public class Checkpointer extends IgniteWorker {
CheckpointWorkflow checkpointWorkFlow,
CheckpointPagesWriterFactory factory,
FilePageStoreManager filePageStoreManager,
+ Compactor compactor,
PageMemoryCheckpointConfiguration checkpointConfig
) {
super(log, igniteInstanceName, "checkpoint-thread", workerListener);
@@ -168,10 +173,11 @@ public class Checkpointer extends IgniteWorker {
this.checkpointWorkflow = checkpointWorkFlow;
this.checkpointPagesWriterFactory = factory;
this.filePageStoreManager = filePageStoreManager;
+ this.compactor = compactor;
scheduledCheckpointProgress = new CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
- int checkpointWritePageThreads = checkpointConfig.threads().value();
+ int checkpointWritePageThreads = checkpointConfig.checkpointThreads().value();
if (checkpointWritePageThreads > 1) {
checkpointWritePagesPool = new ThreadPoolExecutor(
@@ -405,12 +411,7 @@ public class Checkpointer extends IgniteWorker {
if (pageWritePool == null) {
write.run();
} else {
- try {
- pageWritePool.execute(write);
- } catch (RejectedExecutionException ignore) {
- // Run the task synchronously.
- write.run();
- }
+ pageWritePool.execute(write);
}
}
@@ -431,6 +432,8 @@ public class Checkpointer extends IgniteWorker {
syncUpdatedPageStores(updatedPartitions);
+ compactor.addDeltaFiles(updatedPartitions.size());
+
if (shutdownNow.getAsBoolean()) {
currentCheckpointProgress.fail(new NodeStoppingException("Node is stopping."));
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
new file mode 100644
index 000000000..bf9f285ee
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.compaction;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.stream.Collectors.toCollection;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Entity to compact delta files.
+ *
+ * <p>To start compacting delta files, you need to notify about the appearance of {@link #addDeltaFiles(int) delta files ready for
+ * compaction}. Then all delta files {@link FilePageStore#getDeltaFileToCompaction() ready for compaction} will be collected and merged with
+ * their {@link FilePageStore file page stores} until all delta files are compacted.
+ *
+ * <p>Delta file compaction process consists of:
+ * <ul>
+ * <li>Copying pages from a delta file to a partition file.</li>
+ * <li>Fsync of the partition file.</li>
+ * <li>Remove delta file from {@link FilePageStore} and file system.</li>
+ * </ul>
+ */
+public class Compactor extends IgniteWorker {
+ private final Object mux = new Object();
+
+ private final @Nullable ThreadPoolExecutor threadPoolExecutor;
+
+ private final AtomicInteger deltaFileCount = new AtomicInteger();
+
+ private final FilePageStoreManager filePageStoreManager;
+
+ /** Thread local with buffers for the compaction threads. */
+ private final ThreadLocal<ByteBuffer> threadBuf;
+
+ /**
+ * Creates new ignite worker with given parameters.
+ *
+ * @param log Logger.
+ * @param igniteInstanceName Name of the Ignite instance this runnable is used in.
+ * @param listener Listener for life-cycle events.
+ * @param threads Number of compaction threads.
+ * @param filePageStoreManager File page store manager.
+ * @param pageSize Page size in bytes.
+ */
+ public Compactor(
+ IgniteLogger log,
+ String igniteInstanceName,
+ @Nullable IgniteWorkerListener listener,
+ ConfigurationValue<Integer> threads,
+ FilePageStoreManager filePageStoreManager,
+ int pageSize
+ ) {
+ super(log, igniteInstanceName, "compaction-thread", listener);
+
+ this.filePageStoreManager = filePageStoreManager;
+
+ threadBuf = ThreadLocal.withInitial(() -> {
+ ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize);
+
+ tmpWriteBuf.order(ByteOrder.nativeOrder());
+
+ return tmpWriteBuf;
+ });
+
+ int threadCount = threads.value();
+
+ if (threadCount > 1) {
+ threadPoolExecutor = new ThreadPoolExecutor(
+ threadCount,
+ threadCount,
+ 30_000,
+ MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("compaction-runner-io", log)
+ );
+ } else {
+ threadPoolExecutor = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void body() throws InterruptedException {
+ try {
+ while (!isCancelled()) {
+ waitDeltaFiles();
+
+ if (isCancelled()) {
+ log.info("Skipping the delta file compaction because the node is stopping");
+
+ return;
+ }
+
+ doCompaction();
+ }
+ } catch (Throwable t) {
+ // TODO: IGNITE-16899 By analogy with 2.0, we need to handle the exception (err) by the FailureProcessor
+
+ throw new IgniteInternalException(t);
+ }
+ }
+
+ /**
+ * Waiting for delta files.
+ */
+ void waitDeltaFiles() {
+ try {
+ synchronized (mux) {
+ while (deltaFileCount.get() == 0 && !isCancelled()) {
+ blockingSectionBegin();
+
+ try {
+ mux.wait();
+ } finally {
+ blockingSectionEnd();
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ isCancelled.set(true);
+ }
+ }
+
+ /**
+ * Adds the number of delta files to compact.
+ *
+ * @param count Number of delta files.
+ */
+ public void addDeltaFiles(int count) {
+ assert count >= 0;
+
+ if (count > 0) {
+ deltaFileCount.addAndGet(count);
+
+ synchronized (mux) {
+ mux.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Merges delta files with partition files.
+ */
+ void doCompaction() {
+ // Let's collect one delta file for each partition.
+ Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = filePageStoreManager.allPageStores().stream()
+ .flatMap(List::stream)
+ .map(filePageStore -> {
+ DeltaFilePageStoreIo deltaFileToCompaction = filePageStore.getDeltaFileToCompaction();
+
+ return deltaFileToCompaction == null ? null : new IgniteBiTuple<>(filePageStore, deltaFileToCompaction);
+ })
+ .filter(Objects::nonNull)
+ .collect(toCollection(ConcurrentLinkedQueue::new));
+
+ assert !queue.isEmpty();
+
+ updateHeartbeat();
+
+ int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize();
+
+ CompletableFuture<?>[] futures = new CompletableFuture[threads];
+
+ for (int i = 0; i < threads; i++) {
+ CompletableFuture<?> future = futures[i] = new CompletableFuture<>();
+
+ Runnable merger = () -> {
+ IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo> toMerge;
+
+ try {
+ while ((toMerge = queue.poll()) != null) {
+ mergeDeltaFileToMainFile(toMerge.get1(), toMerge.get2());
+ }
+ } catch (Throwable ex) {
+ future.completeExceptionally(ex);
+ }
+
+ future.complete(null);
+ };
+
+ if (isCancelled()) {
+ return;
+ }
+
+ if (threadPoolExecutor == null) {
+ merger.run();
+ } else {
+ threadPoolExecutor.execute(merger);
+ }
+ }
+
+ updateHeartbeat();
+
+ // Wait and check for errors.
+ CompletableFuture.allOf(futures).join();
+ }
+
+ /**
+ * Starts the compacter.
+ */
+ public void start() {
+ if (runner() != null) {
+ return;
+ }
+
+ assert runner() == null : "Compacter is running";
+
+ new IgniteThread(this).start();
+ }
+
+ /**
+ * Stops the compacter.
+ */
+ public void stop() throws Exception {
+ cancel();
+
+ boolean interrupt = false;
+
+ while (true) {
+ try {
+ join();
+
+ break;
+ } catch (InterruptedException e) {
+ interrupt = true;
+ }
+ }
+
+ if (interrupt) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (threadPoolExecutor != null) {
+ IgniteUtils.shutdownAndAwaitTermination(threadPoolExecutor, 2, TimeUnit.MINUTES);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void cancel() {
+ if (log.isDebugEnabled()) {
+ log.debug("Cancelling grid runnable: " + this);
+ }
+
+ // Do not interrupt runner thread.
+ isCancelled.set(true);
+
+ synchronized (mux) {
+ mux.notifyAll();
+ }
+ }
+
+ /**
+ * Merges the main file page store with the delta file page store.
+ *
+ * <p>Steps:
+ * <ul>
+ * <li>Copy pages from delta file page store to file page store.</li>
+ * <li>Fsync the file page store.</li>
+ * <li>Removing the delta file page store from a file page store.</li>
+ * </ul>
+ *
+ * @param filePageStore File page store.
+ * @param deltaFilePageStore Delta file page store.
+ * @throws Throwable If failed.
+ */
+ void mergeDeltaFileToMainFile(
+ FilePageStore filePageStore,
+ DeltaFilePageStoreIo deltaFilePageStore
+ ) throws Throwable {
+ // Copy pages deltaFilePageStore -> filePageStore.
+ ByteBuffer buffer = threadBuf.get();
+
+ for (long pageIndex : deltaFilePageStore.pageIndexes()) {
+ updateHeartbeat();
+
+ if (isCancelled()) {
+ return;
+ }
+
+ long pageOffset = deltaFilePageStore.pageOffset(pageIndex);
+
+ // pageIndex instead of pageId, only for debugging in case of errors
+ // since we do not know the pageId until we read it from the pageOffset.
+ boolean read = deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageIndex, pageOffset, buffer.rewind(), false);
+
+ assert read : deltaFilePageStore.filePath();
+
+ long pageId = PageIo.getPageId(buffer.rewind());
+
+ assert pageId != 0 : deltaFilePageStore.filePath();
+
+ updateHeartbeat();
+
+ if (isCancelled()) {
+ return;
+ }
+
+ filePageStore.write(pageId, buffer.rewind(), true);
+ }
+
+ // Fsync the file page store.
+ updateHeartbeat();
+
+ if (isCancelled()) {
+ return;
+ }
+
+ filePageStore.sync();
+
+ // Removing the delta file page store from a file page store.
+ updateHeartbeat();
+
+ if (isCancelled()) {
+ return;
+ }
+
+ boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore);
+
+ assert removed : filePageStore.filePath();
+
+ deltaFilePageStore.markMergedToFilePageStore();
+
+ deltaFilePageStore.stop(true);
+
+ deltaFileCount.decrementAndGet();
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java
index 9c1319938..7043f90ab 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java
@@ -64,7 +64,11 @@ public abstract class AbstractFilePageStoreIo implements Closeable {
/** Initialized file page store IO. */
private volatile boolean initialized;
- /** Caches the existence state of file. After it is initialized, it will be not {@code null} during lifecycle. */
+ /**
+ * Caches the existence state of file. After it is initialized, it will be not {@code null} during lifecycle.
+ *
+ * <p>Guarded by {@link #readWriteLock}.
+ */
private @Nullable Boolean fileExists;
/**
@@ -140,7 +144,7 @@ public abstract class AbstractFilePageStoreIo implements Closeable {
* @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}.
* @throws IgniteInternalCheckedException If reading failed (IO error occurred).
*/
- public void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
+ protected void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
read0(pageId, pageOff, pageBuf, !skipCrc, keepCrc);
}
@@ -513,8 +517,6 @@ public abstract class AbstractFilePageStoreIo implements Closeable {
if (keepCrc) {
PageIo.setCrc(pageBuf, savedCrc32);
}
-
- return;
} catch (IOException e) {
throw new IgniteInternalCheckedException("Failed to read page [file=" + filePath + ", pageId=" + pageId + "]", e);
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java
index 2fd974a82..7d5256a16 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java
@@ -31,6 +31,8 @@ import java.nio.ByteBuffer;
import java.nio.file.Path;
import org.apache.ignite.internal.fileio.FileIo;
import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
/**
* Implementation of the class for working with the delta file page storage IO.
@@ -38,6 +40,9 @@ import org.apache.ignite.internal.fileio.FileIoFactory;
public class DeltaFilePageStoreIo extends AbstractFilePageStoreIo {
private final DeltaFilePageStoreIoHeader header;
+ /** Lock to prevent reads after merging with a file page store. */
+ private final IgniteSpinBusyLock mergedBusyLock = new IgniteSpinBusyLock();
+
/**
* Constructor.
*
@@ -95,7 +100,16 @@ public class DeltaFilePageStoreIo extends AbstractFilePageStoreIo {
*/
@Override
public long pageOffset(long pageId) {
- int searchResult = binarySearch(header.pageIndexes(), pageIndex(pageId));
+ return pageOffset(pageIndex(pageId));
+ }
+
+ /**
+ * Returns page offset within the store file, {@code -1} if page not found in delta file.
+ *
+ * @param pageIdx Page index.
+ */
+ public long pageOffset(int pageIdx) {
+ int searchResult = binarySearch(header.pageIndexes(), pageIdx);
if (searchResult < 0) {
return -1;
@@ -104,10 +118,56 @@ public class DeltaFilePageStoreIo extends AbstractFilePageStoreIo {
return (long) searchResult * pageSize() + headerSize();
}
+ /**
+ * Reads a page.
+ *
+ * @param pageId Page ID.
+ * @param pageBuf Page buffer to read into.
+ * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}.
+ * @return {@code True} if the page was successfully read, otherwise the delta file page store is {@link #markMergedToFilePageStore()
+ * merged} with the file page store (must be read from file page store).
+ * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
+ */
+ public boolean readWithMergedToFilePageStoreCheck(
+ long pageId,
+ long pageOff,
+ ByteBuffer pageBuf,
+ boolean keepCrc
+ ) throws IgniteInternalCheckedException {
+ if (!mergedBusyLock.enterBusy()) {
+ return false;
+ }
+
+ try {
+ super.read(pageId, pageOff, pageBuf, keepCrc);
+
+ return true;
+ } finally {
+ mergedBusyLock.leaveBusy();
+ }
+ }
+
/**
* Returns the index of the delta file page store.
*/
public int fileIndex() {
return header.index();
}
+
+ /**
+ * Marks that the delta file page store has been merged with the file page store.
+ *
+ * <p>It waits for all current {@link #readWithMergedToFilePageStoreCheck(long, long, ByteBuffer, boolean) readings} to end, and
+ * subsequent ones will return {@code false} (will need to read from the file page store not from delta file page store).
+ */
+ public void markMergedToFilePageStore() {
+ mergedBusyLock.block();
+ }
+
+ /**
+ * Returns page indexes of the delta file page store.
+ */
+ public int[] pageIndexes() {
+ return header.pageIndexes();
+ }
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java
index 9ff231861..5a3b3a2e1 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -168,9 +169,9 @@ public class FilePageStore implements PageStore {
long pageOff = deltaFilePageStoreIo.pageOffset(pageId);
if (pageOff >= 0) {
- deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, keepCrc);
-
- return;
+ if (deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, pageBuf, keepCrc)) {
+ return;
+ }
}
}
@@ -182,17 +183,7 @@ public class FilePageStore implements PageStore {
public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
assert pageIndex(pageId) <= pageCount : "pageIdx=" + pageIndex(pageId) + ", pageCount=" + pageCount;
- for (DeltaFilePageStoreIo deltaFilePageStoreIo : deltaFilePageStoreIos) {
- long pageOff = deltaFilePageStoreIo.pageOffset(pageId);
-
- if (pageOff >= 0) {
- deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, keepCrc);
-
- return;
- }
- }
-
- filePageStoreIo.read(pageId, filePageStoreIo.pageOffset(pageId), pageBuf, keepCrc);
+ readWithoutPageIdCheck(pageId, pageBuf, keepCrc);
}
/** {@inheritDoc} */
@@ -344,4 +335,44 @@ public class FilePageStore implements PageStore {
public int deltaFileCount() {
return deltaFilePageStoreIos.size();
}
+
+ /**
+ * Returns the delta file to compaction (oldest).
+ *
+ * <p>Thread safe.
+ */
+ public @Nullable DeltaFilePageStoreIo getDeltaFileToCompaction() {
+ // Snapshot of delta files.
+ Iterator<DeltaFilePageStoreIo> iterator = deltaFilePageStoreIos.iterator();
+
+ // Last one is the oldest.
+ DeltaFilePageStoreIo last = null;
+
+ int count = 0;
+
+ while (iterator.hasNext()) {
+ last = iterator.next();
+
+ count++;
+ }
+
+ // If last is just created, then it cannot be compacted yet.
+ if (count == 1 && newDeltaFilePageStoreIoFuture != null) {
+ last = null;
+ }
+
+ return last;
+ }
+
+ /**
+ * Deletes delta file.
+ *
+ * <p>Thread safe.
+ *
+ * @param deltaFilePageStoreIo Delta file to be deleted.
+ * @return {@code True} if the delta file being removed was present.
+ */
+ public boolean removeDeltaFile(DeltaFilePageStoreIo deltaFilePageStoreIo) {
+ return deltaFilePageStoreIos.remove(deltaFilePageStoreIo);
+ }
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java
index 0d340c9e9..e14d397bf 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java
@@ -28,6 +28,7 @@ import java.nio.file.Path;
import org.apache.ignite.internal.fileio.FileIo;
import org.apache.ignite.internal.fileio.FileIoFactory;
import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
/**
* Implementation of the class for working with the file page file storage IO.
@@ -83,6 +84,12 @@ public class FilePageStoreIo extends AbstractFilePageStoreIo {
checkFilePageSize(this.header.pageSize(), header.pageSize());
}
+ /** {@inheritDoc} */
+ @Override
+ public void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
+ super.read(pageId, pageOff, pageBuf, keepCrc);
+ }
+
/** {@inheritDoc} */
@Override
public long pageOffset(long pageId) {
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
index 64d67fd28..70a15608c 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
@@ -21,6 +21,8 @@ import static java.nio.file.Files.createDirectories;
import static java.util.Collections.unmodifiableList;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
import static org.apache.ignite.internal.util.GridUnsafe.freeBuffer;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
@@ -82,9 +84,6 @@ public class FilePageStoreManager implements PageReadWriteManager {
/** Page size in bytes. */
private final int pageSize;
- /** Page read write manager. */
- private final PageReadWriteManager pageReadWriteManager = new PageReadWriteManagerImpl(this);
-
/**
* Executor to disallow running code that modifies data in {@link #groupPageStores} concurrently with cleanup of file page store.
*/
@@ -180,7 +179,15 @@ public class FilePageStoreManager implements PageReadWriteManager {
/** {@inheritDoc} */
@Override
public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
- pageReadWriteManager.read(grpId, pageId, pageBuf, keepCrc);
+ FilePageStore pageStore = getStore(grpId, partitionId(pageId));
+
+ try {
+ pageStore.read(pageId, pageBuf, keepCrc);
+ } catch (IgniteInternalCheckedException e) {
+ // TODO: IGNITE-16899 By analogy with 2.0, fail a node
+
+ throw e;
+ }
}
/** {@inheritDoc} */
@@ -191,13 +198,35 @@ public class FilePageStoreManager implements PageReadWriteManager {
ByteBuffer pageBuf,
boolean calculateCrc
) throws IgniteInternalCheckedException {
- return pageReadWriteManager.write(grpId, pageId, pageBuf, calculateCrc);
+ FilePageStore pageStore = getStore(grpId, partitionId(pageId));
+
+ try {
+ pageStore.write(pageId, pageBuf, calculateCrc);
+ } catch (IgniteInternalCheckedException e) {
+ // TODO: IGNITE-16899 By analogy with 2.0, fail a node
+
+ throw e;
+ }
+
+ return pageStore;
}
/** {@inheritDoc} */
@Override
public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
- return pageReadWriteManager.allocatePage(grpId, partId, flags);
+ assert partId >= 0 && partId <= MAX_PARTITION_ID : partId;
+
+ FilePageStore pageStore = getStore(grpId, partId);
+
+ try {
+ int pageIdx = pageStore.allocatePage();
+
+ return pageId(partId, flags, pageIdx);
+ } catch (IgniteInternalCheckedException e) {
+ // TODO: IGNITE-16899 By analogy with 2.0, fail a node
+
+ throw e;
+ }
}
/**
@@ -239,6 +268,13 @@ public class FilePageStoreManager implements PageReadWriteManager {
return groupPageStores.get(grpId);
}
+ /**
+ * Returns all page stores of all groups.
+ */
+ public Collection<List<FilePageStore>> allPageStores() {
+ return groupPageStores.allPageStores();
+ }
+
/**
* Returns partition file page store for the corresponding parameters.
*
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java
deleted file mode 100644
index 94e47164f..000000000
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence.store;
-
-import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.tostring.IgniteToStringExclude;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-
-/**
- * {@link org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager} implementation.
- */
-class PageReadWriteManagerImpl implements org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager {
- @IgniteToStringExclude
- protected final FilePageStoreManager filePageStoreManager;
-
- /**
- * Constructor.
- *
- * @param filePageStoreManager File page store manager.
- */
- public PageReadWriteManagerImpl(FilePageStoreManager filePageStoreManager) {
- this.filePageStoreManager = filePageStoreManager;
- }
-
- /** {@inheritDoc} */
- @Override
- public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
- FilePageStore pageStore = filePageStoreManager.getStore(grpId, partitionId(pageId));
-
- try {
- pageStore.read(pageId, pageBuf, keepCrc);
- } catch (IgniteInternalCheckedException e) {
- // TODO: IGNITE-16899 By analogy with 2.0, fail a node
-
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public PageStore write(
- int grpId,
- long pageId,
- ByteBuffer pageBuf,
- boolean calculateCrc
- ) throws IgniteInternalCheckedException {
- FilePageStore pageStore = filePageStoreManager.getStore(grpId, partitionId(pageId));
-
- try {
- pageStore.write(pageId, pageBuf, calculateCrc);
- } catch (IgniteInternalCheckedException e) {
- // TODO: IGNITE-16899 By analogy with 2.0, fail a node
-
- throw e;
- }
-
- return pageStore;
- }
-
- /** {@inheritDoc} */
- @Override
- public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
- assert partId >= 0 && partId <= MAX_PARTITION_ID : partId;
-
- FilePageStore pageStore = filePageStoreManager.getStore(grpId, partId);
-
- try {
- int pageIdx = pageStore.allocatePage();
-
- return pageId(partId, flags, pageIdx);
- } catch (IgniteInternalCheckedException e) {
- // TODO: IGNITE-16899 By analogy with 2.0, fail a node
-
- throw e;
- }
- }
-}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java
index 9866ec28a..66d778a6a 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java
@@ -51,8 +51,7 @@ public interface PageStore extends Closeable {
*
* @param pageId Page ID.
* @param pageBuf Page buffer to read into.
- * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code
- * keepCrc}.
+ * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}.
* @throws IgniteInternalCheckedException If reading failed (IO error occurred).
*/
void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException;
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index d4ad1f6b4..3e5f2b05f 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -46,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -71,6 +72,7 @@ import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
+import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor;
import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
@@ -91,7 +93,7 @@ public class CheckpointerTest {
private final IgniteLogger log = Loggers.forClass(CheckpointerTest.class);
- @InjectConfiguration("mock : {threads=1, frequency=1000, frequencyDeviation=0}")
+ @InjectConfiguration("mock : {checkpointThreads=1, frequency=1000, frequencyDeviation=0}")
private PageMemoryCheckpointConfiguration checkpointConfig;
@BeforeAll
@@ -116,6 +118,7 @@ public class CheckpointerTest {
createCheckpointWorkflow(EMPTY),
createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)),
mock(FilePageStoreManager.class),
+ mock(Compactor.class),
checkpointConfig
);
@@ -148,6 +151,7 @@ public class CheckpointerTest {
mock(CheckpointWorkflow.class),
mock(CheckpointPagesWriterFactory.class),
mock(FilePageStoreManager.class),
+ mock(Compactor.class),
checkpointConfig
));
@@ -247,6 +251,7 @@ public class CheckpointerTest {
mock(CheckpointWorkflow.class),
mock(CheckpointPagesWriterFactory.class),
mock(FilePageStoreManager.class),
+ mock(Compactor.class),
checkpointConfig
);
@@ -275,6 +280,7 @@ public class CheckpointerTest {
createCheckpointWorkflow(EMPTY),
createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)),
mock(FilePageStoreManager.class),
+ mock(Compactor.class),
checkpointConfig
));
@@ -352,6 +358,8 @@ public class CheckpointerTest {
when(filePageStore.getNewDeltaFile()).thenReturn(completedFuture(mock(DeltaFilePageStoreIo.class)));
+ Compactor compactor = mock(Compactor.class);
+
Checkpointer checkpointer = spy(new Checkpointer(
log,
"test",
@@ -360,6 +368,7 @@ public class CheckpointerTest {
createCheckpointWorkflow(dirtyPages),
createCheckpointPagesWriterFactory(partitionMetaManager),
createFilePageStoreManager(Map.of(new GroupPartitionId(0, 0), filePageStore)),
+ compactor,
checkpointConfig
));
@@ -367,6 +376,7 @@ public class CheckpointerTest {
verify(dirtyPages, times(1)).toDirtyPageIdQueue();
verify(checkpointer, times(1)).startCheckpointProgress();
+ verify(compactor, times(1)).addDeltaFiles(eq(1));
assertEquals(checkpointer.lastCheckpointProgress().currentCheckpointPagesCount(), 3);
}
@@ -381,6 +391,7 @@ public class CheckpointerTest {
mock(CheckpointWorkflow.class),
mock(CheckpointPagesWriterFactory.class),
mock(FilePageStoreManager.class),
+ mock(Compactor.class),
checkpointConfig
);
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
new file mode 100644
index 000000000..d065efcda
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.compaction;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link Compactor} testing.
+ */
+public class CompactorTest {
+ private static final int PAGE_SIZE = 1024;
+
+ private final IgniteLogger log = Loggers.forClass(CompactorTest.class);
+
+ @Test
+ void testStartAndStop() throws Exception {
+ Compactor compactor = new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE);
+
+ compactor.start();
+
+ assertNull(compactor.runner());
+
+ assertFalse(compactor.isCancelled());
+ assertFalse(compactor.isDone());
+ assertFalse(Thread.currentThread().isInterrupted());
+
+ compactor.start();
+
+ assertTrue(waitForCondition(() -> compactor.runner() != null, 10, 1_000));
+
+ compactor.stop();
+
+ assertTrue(waitForCondition(() -> compactor.runner() == null, 10, 1_000));
+
+ assertTrue(compactor.isCancelled());
+ assertTrue(compactor.isDone());
+ assertFalse(Thread.currentThread().isInterrupted());
+ }
+
+ @Test
+ void testMergeDeltaFileToMainFile() throws Throwable {
+ Compactor compactor = new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE);
+
+ FilePageStore filePageStore = mock(FilePageStore.class);
+ DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class);
+
+ when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
+
+ when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
+
+ when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean()))
+ .then(answer -> {
+ ByteBuffer buffer = answer.getArgument(2);
+
+ PageIo.setPageId(bufferAddress(buffer), 1);
+
+ return true;
+ });
+
+ compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo);
+
+ verify(deltaFilePageStoreIo, times(1)).readWithMergedToFilePageStoreCheck(eq(0L), eq(0L), any(ByteBuffer.class), anyBoolean());
+ verify(filePageStore, times(1)).write(eq(1L), any(ByteBuffer.class), anyBoolean());
+
+ verify(filePageStore, times(1)).sync();
+ verify(filePageStore, times(1)).removeDeltaFile(eq(deltaFilePageStoreIo));
+
+ verify(deltaFilePageStoreIo, times(1)).markMergedToFilePageStore();
+ verify(deltaFilePageStoreIo, times(1)).stop(eq(true));
+ }
+
+ @Test
+ void testDoCompaction() throws Throwable {
+ FilePageStore filePageStore = mock(FilePageStore.class);
+
+ DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class);
+
+ when(filePageStore.getDeltaFileToCompaction()).thenReturn(deltaFilePageStoreIo);
+
+ FilePageStoreManager filePageStoreManager = mock(FilePageStoreManager.class);
+
+ when(filePageStoreManager.allPageStores()).thenReturn(List.of(List.of(filePageStore)));
+
+ Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), filePageStoreManager, PAGE_SIZE));
+
+ doAnswer(answer -> {
+ assertSame(filePageStore, answer.getArgument(0));
+ assertSame(deltaFilePageStoreIo, answer.getArgument(1));
+
+ return null;
+ })
+ .when(compactor)
+ .mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class));
+
+ compactor.doCompaction();
+
+ verify(filePageStore, times(1)).getDeltaFileToCompaction();
+
+ verify(compactor, times(1)).mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class));
+ }
+
+ @Test
+ void testBody() throws Exception {
+ Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE));
+
+ doNothing().when(compactor).waitDeltaFiles();
+
+ doAnswer(answer -> {
+ compactor.cancel();
+
+ return null;
+ }).when(compactor).doCompaction();
+
+ compactor.body();
+
+ verify(compactor, times(3)).isCancelled();
+ verify(compactor, times(1)).waitDeltaFiles();
+ verify(compactor, times(1)).doCompaction();
+ }
+
+ @Test
+ void testWaitDeltaFiles() throws Exception {
+ Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE));
+
+ CompletableFuture<?> waitDeltaFilesFuture = runAsync(compactor::waitDeltaFiles);
+
+ assertThrows(TimeoutException.class, () -> waitDeltaFilesFuture.get(100, MILLISECONDS));
+
+ compactor.addDeltaFiles(1);
+
+ waitDeltaFilesFuture.get(100, MILLISECONDS);
+ }
+
+ @Test
+ void testCancel() throws Exception {
+ Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE));
+
+ assertFalse(compactor.isCancelled());
+
+ CompletableFuture<?> waitDeltaFilesFuture = runAsync(compactor::waitDeltaFiles);
+
+ assertThrows(TimeoutException.class, () -> waitDeltaFilesFuture.get(100, MILLISECONDS));
+
+ compactor.cancel();
+
+ assertTrue(compactor.isCancelled());
+ assertFalse(Thread.currentThread().isInterrupted());
+
+ waitDeltaFilesFuture.get(100, MILLISECONDS);
+ }
+
+ private static ConfigurationValue<Integer> threadsConfig(int threads) {
+ ConfigurationValue<Integer> configValue = mock(ConfigurationValue.class);
+
+ when(configValue.value()).thenReturn(threads);
+
+ return configValue;
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java
index b72d7783e..cd0cd9bbc 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java
@@ -227,7 +227,7 @@ public abstract class AbstractFilePageStoreIoTest {
long expPageId = createDataPageId(() -> 0);
- ByteBuffer pageByteBuffer = createPageByteBuffer(0, PAGE_SIZE);
+ ByteBuffer pageByteBuffer = createPageByteBuffer(expPageId, PAGE_SIZE);
// Puts random bytes after: type (2 byte) + version (2 byte) + crc (4 byte).
pageByteBuffer.position(8).put(randomBytes(128));
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java
index 27beb4c08..cebf1bd4a 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java
@@ -23,14 +23,21 @@ import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.DELTA_FILE_VERSION_1;
import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.arr;
+import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.createDataPageId;
+import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.createPageByteBuffer;
+import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.randomBytes;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.stream.IntStream;
import org.apache.ignite.internal.fileio.FileIo;
@@ -50,11 +57,19 @@ public class DeltaFilePageStoreIoTest extends AbstractFilePageStoreIoTest {
try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(testFilePath, header)) {
assertEquals(PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 0)));
+ assertEquals(PAGE_SIZE, filePageStoreIo.pageOffset(0));
+
assertEquals(2 * PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 1)));
+ assertEquals(2 * PAGE_SIZE, filePageStoreIo.pageOffset(1));
+
assertEquals(3 * PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 2)));
+ assertEquals(3 * PAGE_SIZE, filePageStoreIo.pageOffset(2));
assertEquals(-1, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 3)));
+ assertEquals(-1, filePageStoreIo.pageOffset(3));
+
assertEquals(-1, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 4)));
+ assertEquals(-1, filePageStoreIo.pageOffset(4));
}
}
@@ -134,6 +149,51 @@ public class DeltaFilePageStoreIoTest extends AbstractFilePageStoreIoTest {
}
}
+ @Test
+ void testMergedToFilePageStore() throws Exception {
+ DeltaFilePageStoreIoHeader header = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 1, PAGE_SIZE, arr(0, 1, 2));
+
+ try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(workDir.resolve("test"), header)) {
+ // Preparation for reading.
+ long pageId = createDataPageId(() -> 0);
+
+ ByteBuffer pageByteBuffer = createPageByteBuffer(pageId, PAGE_SIZE);
+
+ // Puts random bytes after: type (2 byte) + version (2 byte) + crc (4 byte).
+ pageByteBuffer.position(8).put(randomBytes(128));
+
+ filePageStoreIo.write(pageId, pageByteBuffer.rewind(), true);
+
+ filePageStoreIo.sync();
+
+ // Checking readings.
+ ByteBuffer buffer = ByteBuffer.allocateDirect(PAGE_SIZE).order(pageByteBuffer.order());
+
+ long pageOff = filePageStoreIo.pageOffset(pageId);
+
+ assertTrue(filePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, buffer, false));
+
+ assertEquals(pageByteBuffer.rewind(), buffer.rewind());
+
+ buffer.rewind().put(new byte[PAGE_SIZE]);
+
+ filePageStoreIo.markMergedToFilePageStore();
+
+ assertFalse(filePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, buffer.rewind(), false));
+
+ assertEquals(ByteBuffer.allocateDirect(PAGE_SIZE).order(pageByteBuffer.order()), buffer.rewind());
+ }
+ }
+
+ @Test
+ void testPageIndexes() throws Exception {
+ DeltaFilePageStoreIoHeader header = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 1, PAGE_SIZE, arr(0, 1, 2));
+
+ try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(workDir.resolve("test"), header)) {
+ assertArrayEquals(arr(0, 1, 2), filePageStoreIo.pageIndexes());
+ }
+ }
+
/** {@inheritDoc} */
@Override
DeltaFilePageStoreIo createFilePageStoreIo(Path filePath) {
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java
index 959db3983..52588245c 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java
@@ -45,6 +45,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
@@ -351,6 +352,24 @@ public class FilePageStoreManagerTest {
);
}
+ @Test
+ void testAllPageStores() throws Exception {
+ FilePageStoreManager manager = createManager();
+
+ manager.start();
+
+ manager.initialize("test0", 1, 1);
+ manager.initialize("test1", 2, 1);
+
+ assertThat(
+ manager.allPageStores().stream().flatMap(List::stream).map(FilePageStore::filePath).collect(toList()),
+ containsInAnyOrder(
+ workDir.resolve("db/table-1").resolve("part-0.bin"),
+ workDir.resolve("db/table-2").resolve("part-0.bin")
+ )
+ );
+ }
+
private FilePageStoreManager createManager() throws Exception {
return new FilePageStoreManager(log, "test", workDir, new RandomAccessFileIoFactory(), 1024);
}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java
index 19e6b20ae..573522291 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.pagememory.persistence.store;
+import static java.nio.ByteOrder.nativeOrder;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.DELTA_FILE_VERSION_1;
import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.VERSION_1;
@@ -29,8 +30,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
@@ -38,6 +41,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.nio.file.Path;
@@ -411,6 +415,94 @@ public class FilePageStoreTest {
}
}
+ @Test
+ void testRemoveDeltaFile() throws Exception {
+ DeltaFilePageStoreIo deltaFile0 = mock(DeltaFilePageStoreIo.class);
+ DeltaFilePageStoreIo deltaFile1 = mock(DeltaFilePageStoreIo.class);
+
+ try (FilePageStore filePageStore = createFilePageStore(workDir.resolve("test"), deltaFile0, deltaFile1)) {
+ assertEquals(2, filePageStore.deltaFileCount());
+
+ assertTrue(filePageStore.removeDeltaFile(deltaFile0));
+ assertFalse(filePageStore.removeDeltaFile(deltaFile0));
+
+ assertEquals(1, filePageStore.deltaFileCount());
+
+ assertTrue(filePageStore.removeDeltaFile(deltaFile1));
+ assertFalse(filePageStore.removeDeltaFile(deltaFile1));
+
+ assertEquals(0, filePageStore.deltaFileCount());
+ }
+ }
+
+ @Test
+ void testGetDeltaFileToCompaction() throws Exception {
+ DeltaFilePageStoreIo deltaFile0 = mock(DeltaFilePageStoreIo.class);
+ DeltaFilePageStoreIo deltaFile1 = mock(DeltaFilePageStoreIo.class);
+
+ when(deltaFile0.fileIndex()).thenReturn(0);
+ when(deltaFile0.fileIndex()).thenReturn(1);
+
+ try (FilePageStore filePageStore = createFilePageStore(workDir.resolve("test"), deltaFile0, deltaFile1)) {
+ assertSame(deltaFile1, filePageStore.getDeltaFileToCompaction());
+
+ CompletableFuture<DeltaFilePageStoreIo> createNewDeltaFileFuture = filePageStore.getOrCreateNewDeltaFile(
+ index -> workDir.resolve("delta" + index),
+ TestPageStoreUtils::arr
+ );
+
+ createNewDeltaFileFuture.get(1, SECONDS);
+
+ assertSame(deltaFile1, filePageStore.getDeltaFileToCompaction());
+
+ filePageStore.removeDeltaFile(deltaFile1);
+
+ assertSame(deltaFile0, filePageStore.getDeltaFileToCompaction());
+
+ filePageStore.removeDeltaFile(deltaFile0);
+
+ assertNull(filePageStore.getDeltaFileToCompaction());
+
+ filePageStore.completeNewDeltaFile();
+
+ assertSame(createNewDeltaFileFuture.join(), filePageStore.getDeltaFileToCompaction());
+
+ filePageStore.removeDeltaFile(createNewDeltaFileFuture.join());
+
+ assertNull(filePageStore.getDeltaFileToCompaction());
+ }
+ }
+
+ @Test
+ void testReadWithMergedDeltaFiles() throws Exception {
+ RandomAccessFileIoFactory ioFactory = new RandomAccessFileIoFactory();
+
+ FilePageStoreHeader header = new FilePageStoreHeader(VERSION_1, PAGE_SIZE);
+ DeltaFilePageStoreIoHeader deltaHeader = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 0, PAGE_SIZE, arr(0));
+
+ try (
+ DeltaFilePageStoreIo deltaIo = spy(new DeltaFilePageStoreIo(ioFactory, deltaFilePath(0), deltaHeader));
+ FilePageStoreIo storeIo = spy(new FilePageStoreIo(ioFactory, workDir.resolve("test"), header));
+ FilePageStore filePageStore = new FilePageStore(storeIo, deltaIo);
+ ) {
+ long pageId = createDataPageId(filePageStore::allocatePage);
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(PAGE_SIZE).order(nativeOrder());
+
+ filePageStore.read(pageId, buffer, true);
+
+ verify(deltaIo, times(1)).readWithMergedToFilePageStoreCheck(eq(pageId), anyLong(), eq(buffer), eq(true));
+ verify(storeIo, times(0)).read(eq(pageId), anyLong(), eq(buffer), eq(true));
+
+ deltaIo.markMergedToFilePageStore();
+
+ filePageStore.read(pageId, buffer.rewind(), true);
+
+ verify(deltaIo, times(2)).readWithMergedToFilePageStoreCheck(eq(pageId), anyLong(), eq(buffer), eq(true));
+ verify(storeIo, times(1)).read(eq(pageId), anyLong(), eq(buffer), eq(true));
+ }
+ }
+
private static FilePageStore createFilePageStore(Path filePath) {
return createFilePageStore(filePath, new FilePageStoreHeader(VERSION_1, PAGE_SIZE));
}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java
deleted file mode 100644
index 723669d97..000000000
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence.store;
-
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.nio.ByteBuffer;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/**
- * For {@link PageReadWriteManagerImpl} testing.
- */
-public class PageReadWriteManagerImplTest {
- private FilePageStore filePageStore;
-
- private FilePageStoreManager filePageStoreManager;
-
- private PageReadWriteManagerImpl pageReadWriteManager;
-
- @BeforeEach
- void setUp() throws Exception {
- filePageStore = mock(FilePageStore.class);
-
- filePageStoreManager = mock(FilePageStoreManager.class);
-
- when(filePageStoreManager.getStore(0, 0)).thenReturn(filePageStore);
-
- pageReadWriteManager = new PageReadWriteManagerImpl(filePageStoreManager);
- }
-
- @Test
- void testRead() throws Exception {
- long pageId = pageId(0, (byte) 0, 0);
-
- ByteBuffer pageBuffer = mock(ByteBuffer.class);
-
- pageReadWriteManager.read(0, pageId, pageBuffer, true);
-
- verify(filePageStore, times(1)).read(pageId, pageBuffer, true);
- }
-
- @Test
- void testWrite() throws Exception {
- long pageId = pageId(0, (byte) 0, 0);
-
- ByteBuffer pageBuffer = mock(ByteBuffer.class);
-
- assertSame(filePageStore, pageReadWriteManager.write(0, pageId, pageBuffer, true));
-
- verify(filePageStore, times(1)).write(pageId, pageBuffer, true);
- }
-
- @Test
- void testAllocatePage() throws Exception {
- assertEquals(
- pageId(0, (byte) 1, 0),
- pageReadWriteManager.allocatePage(0, 0, (byte) 1)
- );
-
- verify(filePageStore, times(1)).allocatePage();
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 529930123..593ce5c59 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -72,6 +72,12 @@ class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableStorage {
try {
dataRegion.filePageStoreManager().initialize(tableView.name(), tableView.tableId(), tableView.partitions());
+
+ int deltaFileCount = dataRegion.filePageStoreManager().getStores(tableView.tableId()).stream()
+ .mapToInt(FilePageStore::deltaFileCount)
+ .sum();
+
+ dataRegion.checkpointManager().addDeltaFileCountForCompaction(deltaFileCount);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error initializing file page stores for table: " + tableView.name(), e);
}