You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/05/13 08:47:46 UTC
[ignite-3] branch main updated: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2 (#800)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7adac2ab3 IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2 (#800)
7adac2ab3 is described below
commit 7adac2ab324a075f2dd9317c546b1129e88d9cc1
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri May 13 11:47:41 2022 +0300
IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2 (#800)
---
.../ignite/internal/util/FastTimestamps.java | 1 +
.../apache/ignite/internal/util/IgniteUtils.java | 12 +-
.../ignite/internal/util/worker/IgniteWorker.java | 321 ++++++++++++++
.../util/worker/IgniteWorkerListener.java} | 33 +-
.../util/worker/WorkProgressDispatcher.java} | 32 +-
.../internal/util/worker/IgniteWorkerTest.java | 466 +++++++++++++++++++++
.../pagememory/persistence/PageMemoryImpl.java | 2 +-
.../pagememory/persistence/PageStoreWriter.java | 2 +-
.../persistence/checkpoint/Checkpoint.java | 58 +++
.../checkpoint/CheckpointDirtyPagesInfoHolder.java | 48 +++
.../persistence/checkpoint/CheckpointListener.java | 69 +++
.../checkpoint/CheckpointMarkersStorage.java | 200 +++++++++
.../persistence/checkpoint/CheckpointProgress.java | 24 +-
.../checkpoint/CheckpointProgressImpl.java | 265 ++++++++++++
.../checkpoint/CheckpointReadWriteLock.java | 2 +-
.../persistence/checkpoint/CheckpointState.java | 1 -
.../persistence/checkpoint/CheckpointWorkflow.java | 351 ++++++++++++++++
...Checkpointer.java => CheckpointWriteOrder.java} | 33 +-
.../persistence/checkpoint/Checkpointer.java | 18 +-
.../checkpoint/IgniteConcurrentMultiPairQueue.java | 227 ++++++++++
.../persistence/PageMemoryImplNoLoadTest.java | 4 +-
.../checkpoint/CheckpointMarkersStorageTest.java | 184 ++++++++
.../checkpoint/CheckpointProgressImplTest.java | 342 +++++++++++++++
.../persistence/checkpoint/CheckpointTest.java | 48 +++
.../checkpoint/CheckpointTestUtils.java | 55 +++
.../checkpoint/CheckpointWorkflowTest.java | 445 ++++++++++++++++++++
.../IgniteConcurrentMultiPairQueueTest.java | 149 +++++++
27 files changed, 3333 insertions(+), 59 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
index af6f00540..c73cedb2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
@@ -31,6 +31,7 @@ public class FastTimestamps {
private static void startUpdater() {
Thread updater = new Thread("FastTimestamps updater") {
+ /** {@inheritDoc} */
@Override
public void run() {
while (true) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 30e0495b8..41a8f3d81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -332,9 +332,17 @@ public class IgniteUtils {
* @return Absolute value.
*/
public static int safeAbs(int i) {
- i = Math.abs(i);
+ return Math.max(Math.abs(i), 0);
+ }
- return i < 0 ? 0 : i;
+ /**
+ * Gets absolute value for long. If long is {@link Long#MIN_VALUE}, then {@code 0} is returned.
+ *
+ * @param i Long value.
+ * @return Absolute value.
+ */
+ public static long safeAbs(long i) {
+ return Math.max(Math.abs(i), 0);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java
new file mode 100644
index 000000000..0cf0a40f6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extension to standard {@link Runnable} interface.
+ *
+ * <p>Adds proper details to be used with {@link Executor} implementations.
+ */
+public abstract class IgniteWorker implements Runnable, WorkProgressDispatcher {
+ /** Ignite logger. */
+ protected final IgniteLogger log;
+
+ /** Thread name. */
+ private final String name;
+
+ /** Ignite instance name. */
+ private final String igniteInstanceName;
+
+ /** Listener. */
+ private final IgniteWorkerListener listener;
+
+ /** Finish mark. */
+ private volatile boolean finished;
+
+ /** Whether this runnable is cancelled. */
+ protected final AtomicBoolean isCancelled = new AtomicBoolean();
+
+ /** Actual thread runner. */
+ private volatile Thread runner;
+
+ /** Timestamp to be updated by this worker periodically to indicate it's up and running. */
+ private volatile long heartbeatTimestamp;
+
+ /** Atomic field updater to change heartbeat. */
+ private static final AtomicLongFieldUpdater<IgniteWorker> HEARTBEAT_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(IgniteWorker.class, "heartbeatTimestamp");
+
+ /** Mutex for finish awaiting. */
+ private final Object mux = new Object();
+
+ /**
+ * Creates new grid worker with given parameters.
+ *
+ * @param log Logger.
+ * @param igniteInstanceName Name of the Ignite instance this runnable is used in.
+ * @param name Worker name. Note that in general thread name and worker (runnable) name are two different things.
+ * The same worker can be executed by multiple threads and therefore for logging and debugging purposes we separate the two.
+ * @param listener Listener for life-cycle events.
+ */
+ protected IgniteWorker(
+ IgniteLogger log,
+ String igniteInstanceName,
+ String name,
+ @Nullable IgniteWorkerListener listener
+ ) {
+ this.log = log;
+ this.igniteInstanceName = igniteInstanceName;
+ this.name = name;
+ this.listener = listener;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public final void run() {
+ updateHeartbeat();
+
+ // Runner thread must be recorded first as other operations may depend on it being present.
+ runner = Thread.currentThread();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Grid runnable started: " + name);
+ }
+
+ try {
+ if (isCancelled.get()) {
+ onCancelledBeforeWorkerScheduled();
+ }
+
+ // Listener callback.
+ if (listener != null) {
+ listener.onStarted(this);
+ }
+
+ body();
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Caught interrupted exception: " + e);
+ }
+
+ Thread.currentThread().interrupt();
+ } catch (Throwable e) {
+ // Catch everything to make sure that it gets logged properly and
+ // not to kill any threads from the underlying thread pool.
+
+ log.error("Runtime error caught during grid runnable execution: " + this, e);
+
+ if (e instanceof Error) {
+ throw e;
+ }
+ } finally {
+ synchronized (mux) {
+ finished = true;
+
+ mux.notifyAll();
+ }
+
+ cleanup();
+
+ if (listener != null) {
+ listener.onStopped(this);
+ }
+
+ if (log.isDebugEnabled()) {
+ if (isCancelled.get()) {
+ log.debug("Grid runnable finished due to cancellation: " + name);
+ } else if (runner.isInterrupted()) {
+ log.debug("Grid runnable finished due to interruption without cancellation: " + name);
+ } else {
+ log.debug("Grid runnable finished normally: " + name);
+ }
+ }
+
+ // Need to set runner to null, to make sure that
+ // further operations on this runnable won't
+ // affect the thread which could have been recycled
+ // by thread pool.
+ runner = null;
+ }
+ }
+
+ /**
+ * The implementation should provide the execution body for this runnable.
+ *
+ * @throws InterruptedException Thrown in case of interruption.
+ */
+ protected abstract void body() throws InterruptedException;
+
+ /**
+ * Optional method that will be called after runnable is finished. Default implementation is no-op.
+ */
+ protected void cleanup() {
+ /* No-op. */
+ }
+
+ /**
+ * Returns runner thread, {@code null} if the worker has not yet started executing.
+ */
+ public @Nullable Thread runner() {
+ return runner;
+ }
+
+ /**
+ * Returns Name of the Ignite instance this runnable belongs to.
+ */
+ public String igniteInstanceName() {
+ return igniteInstanceName;
+ }
+
+ /**
+ * Returns this runnable name.
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Cancels this runnable.
+ */
+ public void cancel() {
+ if (log.isDebugEnabled()) {
+ log.debug("Cancelling grid runnable: " + this);
+ }
+
+ onCancel(isCancelled.compareAndSet(false, true));
+ }
+
+ /**
+ * Joins this runnable.
+ *
+ * @throws InterruptedException Thrown in case of interruption.
+ */
+ public void join() throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Joining grid runnable: " + this);
+ }
+
+ if ((runner == null && isCancelled.get()) || finished) {
+ return;
+ }
+
+ synchronized (mux) {
+ while (!finished) {
+ mux.wait();
+ }
+ }
+ }
+
+ /**
+ * Returns {@code true} if this runnable is cancelled - {@code false} otherwise.
+ *
+ * @see Future#isCancelled()
+ */
+ public boolean isCancelled() {
+ Thread runner = this.runner;
+
+ return isCancelled.get() || (runner != null && runner.isInterrupted());
+ }
+
+ /**
+ * Returns {@code true} if this runnable is finished - {@code false} otherwise.
+ */
+ public boolean isDone() {
+ return finished;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long heartbeat() {
+ return heartbeatTimestamp;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void updateHeartbeat() {
+ long currentTimestamp = coarseCurrentTimeMillis();
+ long heartbeatTimestamp = this.heartbeatTimestamp;
+
+ // Avoid heartbeat update while in the blocking section.
+ while (heartbeatTimestamp < currentTimestamp) {
+ if (HEARTBEAT_UPDATER.compareAndSet(this, heartbeatTimestamp, currentTimestamp)) {
+ return;
+ }
+
+ heartbeatTimestamp = this.heartbeatTimestamp;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void blockingSectionBegin() {
+ heartbeatTimestamp = Long.MAX_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void blockingSectionEnd() {
+ heartbeatTimestamp = coarseCurrentTimeMillis();
+ }
+
+ /**
+ * Can be called from {@link #runner()} thread to perform idleness handling.
+ */
+ protected void onIdle() {
+ if (listener != null) {
+ listener.onIdle(this);
+ }
+ }
+
+ /**
+ * Callback on runner cancellation.
+ *
+ * @param firstCancelRequest Flag indicating that worker cancellation was requested for the first time.
+ */
+ protected void onCancel(boolean firstCancelRequest) {
+ Thread runner = this.runner;
+
+ // Cannot apply Future.cancel() because if we do, then Future.get() would always
+ // throw CancellationException, and we would not be able to wait for task completion.
+ if (runner != null) {
+ runner.interrupt();
+ }
+ }
+
+ /**
+ * Callback on special case, when task is cancelled before is has been scheduled.
+ */
+ protected void onCancelledBeforeWorkerScheduled() {
+ Thread runner = this.runner;
+
+ assert runner != null : this;
+
+ runner.interrupt();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ Thread runner = this.runner;
+
+ return S.toString(IgniteWorker.class, this,
+ "hashCode", hashCode(),
+ "interrupted", (runner != null ? runner.isInterrupted() : "unknown"),
+ "runner", (runner == null ? "null" : runner.getName()));
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorkerListener.java
similarity index 56%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
copy to modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorkerListener.java
index 7c9d5eb89..3ebdb08d5 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorkerListener.java
@@ -15,18 +15,33 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-
-
-import java.util.concurrent.CompletableFuture;
+package org.apache.ignite.internal.util.worker;
/**
- * Represents information of progress of a current checkpoint and allows obtaining future to wait for a particular checkpoint state.
+ * This interface defines worker listener.
*/
-// TODO: IGNITE-16898 Continue porting the code
-public interface CheckpointProgress {
+public interface IgniteWorkerListener {
+ /**
+ * Callback before executing the {@link IgniteWorker#body body of the worker}.
+ *
+ * @param worker Started worker.
+ */
+ default void onStarted(IgniteWorker worker) {
+ }
+
+ /**
+ * Callback after executing the {@link IgniteWorker#body body of the worker}.
+ *
+ * @param worker Stopped worker.
+ */
+ default void onStopped(IgniteWorker worker) {
+ }
+
/**
- * Returns future which can be used for detection when current checkpoint reaches the specific state.
+ * Callback on idle worker.
+ *
+ * @param worker Idle worker.
*/
- CompletableFuture<?> futureFor(CheckpointState state);
+ default void onIdle(IgniteWorker worker) {
+ }
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/WorkProgressDispatcher.java
similarity index 51%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
copy to modules/core/src/main/java/org/apache/ignite/internal/util/worker/WorkProgressDispatcher.java
index 94056b669..68516d08c 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/WorkProgressDispatcher.java
@@ -15,26 +15,30 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-
-import org.jetbrains.annotations.Nullable;
+package org.apache.ignite.internal.util.worker;
/**
- * Empty.
+ * Dispatcher of workers' progress which allows us to understand if worker freezes.
*/
-// TODO: IGNITE-16898 Continue porting the code
-public abstract class Checkpointer {
+public interface WorkProgressDispatcher {
+ /**
+ * Last heartbeat timestamp.
+ */
+ long heartbeat();
+
+ /**
+ * Notifying dispatcher that work is in progress and thread didn't freeze.
+ */
+ void updateHeartbeat();
+
/**
- * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
- *
- * @param delayFromNow Delay from now in milliseconds.
- * @param reason Wakeup reason.
- * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
+ * Protects the worker from timeout penalties if subsequent instructions in the calling thread does not update heartbeat timestamp
+ * timely, e.g. due to blocking operations, up to the nearest {@link #blockingSectionEnd()} call. Nested calls are not supported.
*/
- public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason);
+ void blockingSectionBegin();
/**
- * Returns runner thread, {@code null} if the worker has not yet started executing.
+ * Closes the protection section previously opened by {@link #blockingSectionBegin()}.
*/
- public abstract @Nullable Thread runner();
+ void blockingSectionEnd();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/worker/IgniteWorkerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/worker/IgniteWorkerTest.java
new file mode 100644
index 000000000..ca484abe5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/worker/IgniteWorkerTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_IDLE;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_STARTED;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_STOPPED;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link IgniteWorker} testing.
+ */
+public class IgniteWorkerTest {
+ static final String CLEANUP = "cleanup";
+
+ private final IgniteLogger log = IgniteLogger.forClass(IgniteWorkerTest.class);
+
+ @Test
+ void testNewIgniteWorker() {
+ IgniteWorker worker = new NoopWorker(log, null);
+
+ assertEquals("testNode", worker.igniteInstanceName());
+ assertEquals("testWorker", worker.name());
+
+ assertEquals(0, worker.heartbeat());
+
+ assertFalse(worker.isCancelled());
+ assertFalse(worker.isDone());
+
+ assertNull(worker.runner());
+ }
+
+ @Test
+ void testBlockingSelection() {
+ IgniteWorker worker = new NoopWorker(log, null);
+
+ long currentTimeMillis = coarseCurrentTimeMillis();
+
+ worker.blockingSectionBegin();
+
+ assertEquals(Long.MAX_VALUE, worker.heartbeat());
+
+ worker.blockingSectionEnd();
+
+ assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+
+ // Checks update heartbeat after blockingSectionBegin().
+
+ worker.blockingSectionBegin();
+
+ assertEquals(Long.MAX_VALUE, worker.heartbeat());
+
+ worker.updateHeartbeat();
+
+ assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+
+ worker.blockingSectionEnd();
+
+ assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+ }
+
+ @Test
+ void testUpdateHeartbeat() throws Exception {
+ IgniteWorker worker = new NoopWorker(log, null);
+
+ long currentTimeMillis = coarseCurrentTimeMillis();
+
+ worker.updateHeartbeat();
+
+ long heartbeat = worker.heartbeat();
+
+ assertThat(heartbeat, greaterThanOrEqualTo(currentTimeMillis));
+
+ Thread.sleep(10);
+
+ assertEquals(heartbeat, worker.heartbeat());
+
+ worker.updateHeartbeat();
+
+ assertThat(worker.heartbeat(), greaterThan(heartbeat));
+ }
+
+ @Test
+ void testIdle() {
+ List<String> events = new ArrayList<>();
+
+ TestWorkerListener listener = new TestWorkerListener(events);
+
+ IgniteWorker worker = new NoopWorker(log, listener);
+
+ worker.onIdle();
+
+ assertThat(events, equalTo(List.of(ON_IDLE)));
+ }
+
+ @Test
+ void testRun() {
+ List<String> events = new ArrayList<>();
+
+ TestWorkerListener listener = new TestWorkerListener(events) {
+ /** {@inheritDoc} */
+ @Override
+ public void onStarted(IgniteWorker worker) {
+ super.onStarted(worker);
+
+ assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+ assertSame(Thread.currentThread(), worker.runner());
+ assertFalse(worker.isCancelled());
+ assertFalse(worker.isDone());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onStopped(IgniteWorker worker) {
+ super.onStopped(worker);
+
+ assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+ assertSame(Thread.currentThread(), worker.runner());
+ assertFalse(worker.isCancelled());
+ assertTrue(worker.isDone());
+ }
+ };
+
+ IgniteWorker worker = new NoopWorker(log, listener) {
+ /** {@inheritDoc} */
+ @Override
+ protected void cleanup() {
+ events.add(CLEANUP);
+ }
+ };
+
+ worker.run();
+
+ assertThat(events, equalTo(List.of(ON_STARTED, CLEANUP, ON_STOPPED)));
+
+ assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+ assertNull(worker.runner());
+ assertFalse(worker.isCancelled());
+ assertTrue(worker.isDone());
+ }
+
+ @Test
+ void testInterruptFromBody() {
+ List<String> events = new ArrayList<>();
+
+ TestWorkerListener listener = new TestWorkerListener(events);
+
+ IgniteWorker worker = new NoopWorker(log, listener) {
+ /** {@inheritDoc} */
+ @Override
+ protected void body() throws InterruptedException {
+ Thread.currentThread().interrupt();
+
+ throw new InterruptedException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void cleanup() {
+ events.add(CLEANUP);
+ }
+ };
+
+ worker.run();
+
+ assertThat(listener.events, equalTo(List.of(ON_STARTED, CLEANUP, ON_STOPPED)));
+
+ assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+ assertNull(worker.runner());
+ assertFalse(worker.isCancelled());
+ assertTrue(worker.isDone());
+ assertTrue(Thread.interrupted());
+ }
+
+ @Test
+ void testExceptionFromBody() {
+ List<String> events = new ArrayList<>();
+
+ TestWorkerListener listener = new TestWorkerListener(events);
+
+ IgniteWorker worker = new NoopWorker(log, listener) {
+ /** {@inheritDoc} */
+ @Override
+ protected void body() {
+ throw new RuntimeException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void cleanup() {
+ events.add(CLEANUP);
+ }
+ };
+
+ worker.run();
+
+ assertThat(listener.events, equalTo(List.of(ON_STARTED, CLEANUP, ON_STOPPED)));
+
+ assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+ assertNull(worker.runner());
+ assertFalse(worker.isCancelled());
+ assertTrue(worker.isDone());
+ }
+
+ @Test
+ void testErrorFromBody() {
+ List<String> events = new ArrayList<>();
+
+ TestWorkerListener listener = new TestWorkerListener(events);
+
+ IgniteWorker worker = new NoopWorker(log, listener) {
+ /** {@inheritDoc} */
+ @Override
+ protected void body() {
+ throw new Error();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void cleanup() {
+ events.add(CLEANUP);
+ }
+ };
+
+ assertThrows(Error.class, worker::run);
+
+ assertThat(listener.events, equalTo(List.of(ON_STARTED, CLEANUP, ON_STOPPED)));
+
+ assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+ assertNull(worker.runner());
+ assertFalse(worker.isCancelled());
+ assertTrue(worker.isDone());
+ }
+
+ @Test
+ void testCancelWorker() {
+ List<String> events = new ArrayList<>();
+
+ TestWorkerListener listener = new TestWorkerListener(events);
+
+ IgniteWorker worker = new NoopWorker(log, listener) {
+ /** {@inheritDoc} */
+ @Override
+ protected void body() {
+ cancel();
+
+ cancel();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void cleanup() {
+ events.add(CLEANUP);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void onCancel(boolean firstCancelRequest) {
+ super.onCancel(firstCancelRequest);
+
+ events.add("firstCancel=" + firstCancelRequest);
+
+ assertTrue(runner().isInterrupted());
+ assertTrue(isCancelled());
+ assertFalse(isDone());
+ }
+ };
+
+ worker.run();
+
+ assertThat(listener.events, equalTo(List.of(ON_STARTED, "firstCancel=true", "firstCancel=false", CLEANUP, ON_STOPPED)));
+
+ assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+ assertNull(worker.runner());
+ assertTrue(worker.isCancelled());
+ assertTrue(worker.isDone());
+ assertTrue(Thread.interrupted());
+ }
+
+ @Test
+ void testCancelBeforeStartWorker() {
+ List<String> events = new ArrayList<>();
+
+ TestWorkerListener listener = new TestWorkerListener(events);
+
+ IgniteWorker worker = new NoopWorker(log, listener) {
+ /** {@inheritDoc} */
+ @Override
+ protected void cleanup() {
+ events.add(CLEANUP);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void onCancel(boolean firstCancelRequest) {
+ super.onCancel(firstCancelRequest);
+
+ events.add("firstCancel=" + firstCancelRequest);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void onCancelledBeforeWorkerScheduled() {
+ super.onCancelledBeforeWorkerScheduled();
+
+ events.add("onCancelledBeforeWorkerScheduled");
+
+ assertTrue(runner().isInterrupted());
+ assertTrue(isCancelled());
+ assertFalse(isDone());
+ }
+ };
+
+ worker.cancel();
+
+ worker.run();
+
+ assertThat(
+ listener.events,
+ equalTo(List.of("firstCancel=true", "onCancelledBeforeWorkerScheduled", ON_STARTED, CLEANUP, ON_STOPPED))
+ );
+
+ assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+ assertNull(worker.runner());
+ assertTrue(worker.isCancelled());
+ assertTrue(worker.isDone());
+ assertTrue(Thread.interrupted());
+ }
+
+ @Test
+ void testJoin() throws Exception {
+ CountDownLatch finishBodyLatch = new CountDownLatch(1);
+
+ CountDownLatch startLatch = new CountDownLatch(2);
+
+ try {
+ IgniteWorker worker = new NoopWorker(log, null) {
+ /** {@inheritDoc} */
+ @Override
+ protected void body() throws InterruptedException {
+ finishBodyLatch.await(1, TimeUnit.SECONDS);
+ }
+ };
+
+ CompletableFuture<?> runWorkerFuture = runAsync(() -> {
+ startLatch.countDown();
+
+ worker.run();
+ });
+
+ CompletableFuture<?> joinWorkerFuture = runAsync(() -> {
+ startLatch.countDown();
+
+ worker.join();
+ });
+
+ startLatch.await(100, TimeUnit.MILLISECONDS);
+
+ assertThrows(TimeoutException.class, () -> joinWorkerFuture.get(100, TimeUnit.MILLISECONDS));
+
+ finishBodyLatch.countDown();
+
+ runWorkerFuture.get(100, TimeUnit.MILLISECONDS);
+ joinWorkerFuture.get(100, TimeUnit.MILLISECONDS);
+ } finally {
+ finishBodyLatch.countDown();
+ }
+ }
+
+ /**
+ * A worker implementation that does nothing.
+ */
+ private static class NoopWorker extends IgniteWorker {
+ /**
+ * Constructor.
+ *
+ * @param log Logger.
+ * @param listener Listener for life-cycle events.
+ */
+ protected NoopWorker(IgniteLogger log, @Nullable IgniteWorkerListener listener) {
+ super(log, "testNode", "testWorker", listener);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void body() throws InterruptedException {
+ }
+ }
+
+ /**
+ * Test listener implementation that simply collects events.
+ */
+ static class TestWorkerListener implements IgniteWorkerListener {
+ static final String ON_STARTED = "onStarted";
+
+ static final String ON_STOPPED = "onStopped";
+
+ static final String ON_IDLE = "onIdle";
+
+ final List<String> events;
+
+ /**
+ * Constructor.
+ *
+ * @param events For recording events.
+ */
+ private TestWorkerListener(List<String> events) {
+ this.events = events;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onStarted(IgniteWorker worker) {
+ events.add(ON_STARTED);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onStopped(IgniteWorker worker) {
+ events.add(ON_STOPPED);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onIdle(IgniteWorker worker) {
+ events.add(ON_IDLE);
+ }
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
index cd3c6d2ba..fe24c2e06 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
@@ -1186,7 +1186,7 @@ public class PageMemoryImpl implements PageMemoryEx {
boolean wasDirty = dirty(absPtr, dirty);
if (dirty) {
- // TODO: IGNITE-16898 Don't forget add assertion for checkpoint lock held by this thread
+ // TODO: IGNITE-16935 Don't forget add assertion for checkpoint lock held by this thread
if (!wasDirty || forceAdd) {
Segment seg = segment(pageId.groupId(), pageId.pageId());
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
index 99921e9dd..29f7d257f 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
@@ -33,7 +33,7 @@ public interface PageStoreWriter {
* @param fullPageId Page ID to get byte buffer for. The page ID must be present in the collection returned by the {@link
* PageMemoryImpl#beginCheckpoint(CompletableFuture)} method call.
* @param buf Temporary buffer to write changes into.
- * @param tag {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage).
+ * @param tag {@code Partition generation} if data was read.
* @throws IgniteInternalCheckedException If write page failed.
*/
void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteInternalCheckedException;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
new file mode 100644
index 000000000..54904cc38
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+
+/**
+ * Data class of checkpoint information.
+ */
+class Checkpoint {
+ /** Checkpoint pages. */
+ final IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> dirtyPages;
+
+ /** Checkpoint progress status. */
+ final CheckpointProgressImpl progress;
+
+ /** Number of dirty pages. */
+ final int dirtyPagesSize;
+
+ /**
+ * Constructor.
+ *
+ * @param dirtyPages Pages to write to the page store.
+ * @param progress Checkpoint progress status.
+ */
+ Checkpoint(
+ IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> dirtyPages,
+ CheckpointProgressImpl progress
+ ) {
+ this.dirtyPages = dirtyPages;
+ this.progress = progress;
+
+ dirtyPagesSize = dirtyPages.initialSize();
+ }
+
+ /**
+ * Returns {@code true} if this checkpoint contains at least one dirty page.
+ */
+ public boolean hasDelta() {
+ return dirtyPagesSize != 0;
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java
new file mode 100644
index 000000000..823a116b7
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Collection;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Holder of information about dirty pages by {@link PageMemoryImpl} for checkpoint.
+ */
+class CheckpointDirtyPagesInfoHolder {
+ /** Total number of dirty pages. */
+ final int dirtyPageCount;
+
+ /** Collection of dirty pages per {@link PageMemoryImpl} distribution. */
+ final Collection<IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>>> dirtyPages;
+
+ /**
+ * Constructor.
+ *
+ * @param dirtyPages Collection of dirty pages per {@link PageMemoryImpl} distribution.
+ * @param dirtyPageCount Total number of dirty pages.
+ */
+ public CheckpointDirtyPagesInfoHolder(
+ Collection<IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>>> dirtyPages,
+ int dirtyPageCount
+ ) {
+ this.dirtyPages = dirtyPages;
+ this.dirtyPageCount = dirtyPageCount;
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointListener.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointListener.java
new file mode 100644
index 000000000..345b95b2c
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointListener.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Listener which methods will be called in a corresponded checkpoint life cycle period.
+ */
+public interface CheckpointListener {
+ /**
+ * Callback at the checkpoint start mark.
+ *
+ * <p>Holds checkpoint write lock.
+ *
+ * @param progress Progress of the current checkpoint.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ default void onMarkCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ }
+
+ /**
+ * Callback at the beginning of the checkpoint.
+ *
+ * <p>After release checkpoint write lock.
+ *
+ * @param progress Progress of the current checkpoint.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ default void onCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ }
+
+ /**
+ * Callback before the start of the checkpoint.
+ *
+ * <p>Holds checkpoint read lock.
+ *
+ * @param progress Progress of the current checkpoint.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ default void beforeCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ }
+
+ /**
+ * Callback after checkpoint ends.
+ *
+ * <p>Does not hold checkpoint read and write locks.
+ *
+ * @param progress Progress of the current checkpoint.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ default void afterCheckpointEnd(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java
new file mode 100644
index 000000000..a1f858bf5
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+ /** Checkpoint start marker. */
+ private static final String CHECKPOINT_START_MARKER = "START";
+
+ /** Checkpoint end marker. */
+ private static final String CHECKPOINT_END_MARKER = "END";
+
+ /** Checkpoint marker file name pattern. */
+ private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+ /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+ private final Path checkpointDir;
+
+ /** Checkpoint IDs. */
+ private final Set<UUID> checkpointIds;
+
+ /**
+ * Constructor.
+ *
+ * @param storagePath Storage path.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public CheckpointMarkersStorage(
+ Path storagePath
+ ) throws IgniteInternalCheckedException {
+ checkpointDir = storagePath.resolve("cp");
+
+ try {
+ createDirectories(checkpointDir);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not create directory for checkpoint metadata: " + checkpointDir, e);
+ }
+
+ checkCheckpointDir(checkpointDir);
+
+ try {
+ checkpointIds = list(checkpointDir)
+ .map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+ .collect(toCollection(ConcurrentHashMap::newKeySet));
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not read checkpoint markers: " + checkpointDir, e);
+ }
+ }
+
+ /**
+ * Callback at the start of the checkpoint.
+ *
+ * <p>Creates a start marker for a checkpoint.
+ *
+ * @param checkpointId Checkpoint id.
+ */
+ public void onCheckpointBegin(UUID checkpointId) throws IgniteInternalCheckedException {
+ assert !checkpointIds.contains(checkpointId) : checkpointId;
+
+ Path checkpointStartMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+
+ try {
+ createFile(checkpointStartMarker);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not create start checkpoint marker: " + checkpointStartMarker, e);
+ }
+
+ checkpointIds.add(checkpointId);
+ }
+
+ /**
+ * Callback at the end of the checkpoint.
+ *
+ * <p>Creates an end marker for a checkpoint.
+ *
+ * @param checkpointId Checkpoint id.
+ */
+ public void onCheckpointEnd(UUID checkpointId) throws IgniteInternalCheckedException {
+ assert checkpointIds.contains(checkpointId) : checkpointId;
+
+ Path checkpointEndMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+ try {
+ createFile(checkpointEndMarker);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not create end checkpoint marker: " + checkpointEndMarker, e);
+ }
+
+ for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {
+ UUID id = it.next();
+
+ if (!id.equals(checkpointId)) {
+ removeCheckpointMarkers(id);
+
+ it.remove();
+ }
+ }
+ }
+
+ private void removeCheckpointMarkers(UUID checkpointId) {
+ Path startMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+ Path endMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+ if (exists(startMarker)) {
+ startMarker.toFile().delete();
+ }
+
+ if (exists(endMarker)) {
+ endMarker.toFile().delete();
+ }
+ }
+
+ /**
+ * Checks that the directory contains only paired (start and end) checkpoint markers.
+ */
+ private static void checkCheckpointDir(Path checkpointDir) throws IgniteInternalCheckedException {
+ assert isDirectory(checkpointDir) : checkpointDir;
+
+ try {
+ Map<Boolean, List<Path>> files = list(checkpointDir)
+ .collect(partitioningBy(path -> parseCheckpointIdFromMarkerFile(path) != null));
+
+ if (!files.get(false).isEmpty()) {
+ throw new IgniteInternalCheckedException(
+ "Not checkpoint markers found, they need to be removed manually: " + files.get(false)
+ );
+ }
+
+ Map<UUID, List<Path>> checkpointMarkers = files.get(true).stream()
+ .collect(groupingBy(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile));
+
+ List<UUID> checkpointsWithoutEndMarker = checkpointMarkers.entrySet().stream()
+ .filter(e -> e.getValue().stream().noneMatch(path -> path.getFileName().toString().contains(CHECKPOINT_END_MARKER)))
+ .map(Entry::getKey)
+ .collect(toList());
+
+ if (!checkpointsWithoutEndMarker.isEmpty()) {
+ throw new IgniteInternalCheckedException("Found incomplete checkpoints: " + checkpointsWithoutEndMarker);
+ }
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Could not reads checkpoint markers: " + checkpointDir, e);
+ }
+ }
+
+ private static String checkpointMarkerFileName(UUID id, String checkpointMarker) {
+ return id + "-" + checkpointMarker + ".bin";
+ }
+
+ private static @Nullable UUID parseCheckpointIdFromMarkerFile(Path checkpointMarkerPath) {
+ Matcher matcher = CHECKPOINT_MARKER_FILE_NAME_PATTERN.matcher(checkpointMarkerPath.getFileName().toString());
+
+ if (!matcher.matches()) {
+ return null;
+ }
+
+ return UUID.fromString(matcher.group(1));
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
index 7c9d5eb89..00f37f8a8 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
@@ -17,16 +17,36 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.jetbrains.annotations.Nullable;
/**
* Represents information of progress of a current checkpoint and allows obtaining future to wait for a particular checkpoint state.
*/
-// TODO: IGNITE-16898 Continue porting the code
public interface CheckpointProgress {
+ /**
+ * Returns checkpoint ID.
+ */
+ UUID id();
+
+ /**
+ * Returns description of the reason of the current checkpoint.
+ */
+ @Nullable String reason();
+
+ /**
+ * Return {@code true} If checkpoint already started but have not finished yet.
+ */
+ boolean inProgress();
+
/**
* Returns future which can be used for detection when current checkpoint reaches the specific state.
*/
CompletableFuture<?> futureFor(CheckpointState state);
+
+ /**
+ * Returns number of dirty pages in current checkpoint. If checkpoint is not running, returns {@code 0}.
+ */
+ int currentCheckpointPagesCount();
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
new file mode 100644
index 000000000..f067ca3c8
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.SCHEDULED;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class representing the state of running/scheduled checkpoint.
+ */
+// TODO: IGNITE-16950 Check for a race between futureFor, transitTo and fail
+class CheckpointProgressImpl implements CheckpointProgress {
+ /** Checkpoint id. */
+ private final UUID id = UUID.randomUUID();
+
+ /** Scheduled time of checkpoint. */
+ private volatile long nextCheckpointNanos;
+
+ /** Current checkpoint state. */
+ private volatile AtomicReference<CheckpointState> state = new AtomicReference<>(SCHEDULED);
+
+ /** Future which would be finished when corresponds state is set. */
+ private final Map<CheckpointState, CompletableFuture<Void>> stateFutures = new ConcurrentHashMap<>();
+
+ /** Wakeup reason. */
+ private volatile String reason;
+
+ /** Number of dirty pages in current checkpoint at the beginning of checkpoint. */
+ private volatile int currCheckpointPagesCnt;
+
+ /** Cause of fail, which has happened during the checkpoint or {@code null} if checkpoint was successful. */
+ @Nullable
+ private volatile Throwable failCause;
+
+ /** Counter for written checkpoint pages. Not {@link null} only if checkpoint is running. */
+ @Nullable
+ private volatile AtomicInteger writtenPagesCntr;
+
+ /** Counter for fsynced checkpoint pages. Not {@link null} only if checkpoint is running. */
+ @Nullable
+ private volatile AtomicInteger syncedPagesCntr;
+
+ /** Counter for evicted checkpoint pages. Not {@link null} only if checkpoint is running. */
+ @Nullable
+ private volatile AtomicInteger evictedPagesCntr;
+
+ /**
+ * Constructor.
+ *
+ * @param delay Delay in nanos before next checkpoint is to be executed. Value is from {@code 0} to {@code 365} days.
+ */
+ CheckpointProgressImpl(long delay) {
+ nextCheckpointNanos(delay);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UUID id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable String reason() {
+ return reason;
+ }
+
+ /**
+ * Sets description of the reason of the current checkpoint.
+ *
+ * @param reason New wakeup reason.
+ */
+ public void reason(String reason) {
+ this.reason = reason;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean inProgress() {
+ return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> futureFor(CheckpointState state) {
+ CompletableFuture<Void> stateFut = stateFutures.computeIfAbsent(state, (k) -> new CompletableFuture<>());
+
+ if (greaterOrEqualTo(state)) {
+ completeFuture(stateFut, failCause);
+ }
+
+ return stateFut;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int currentCheckpointPagesCount() {
+ return currCheckpointPagesCnt;
+ }
+
+ /**
+ * Sets current checkpoint pages num to store.
+ *
+ * @param num Pages to store.
+ */
+ public void currentCheckpointPagesCount(int num) {
+ currCheckpointPagesCnt = num;
+ }
+
+ /**
+ * Returns counter for written checkpoint pages. Not {@code null} only if checkpoint is running.
+ */
+ public @Nullable AtomicInteger writtenPagesCounter() {
+ return writtenPagesCntr;
+ }
+
+ /**
+ * Returns counter for fsynced checkpoint pages. Not {@code null} only if checkpoint is running.
+ */
+ public @Nullable AtomicInteger syncedPagesCounter() {
+ return syncedPagesCntr;
+ }
+
+ /**
+ * Returns Counter for evicted pages during current checkpoint. Not {@code null} only if checkpoint is running.
+ */
+ public @Nullable AtomicInteger evictedPagesCounter() {
+ return evictedPagesCntr;
+ }
+
+ /**
+ * Returns scheduled time of checkpoint in nanos.
+ */
+ public long nextCheckpointNanos() {
+ return nextCheckpointNanos;
+ }
+
+ /**
+ * Sets new scheduled time of checkpoint in nanos.
+ *
+ * @param delay Delay in nanos before next checkpoint is to be executed. Value is from {@code 0} to {@code 365} days.
+ */
+ public void nextCheckpointNanos(long delay) {
+ assert delay >= 0 : delay;
+ assert delay <= TimeUnit.DAYS.toNanos(365) : delay;
+
+ this.nextCheckpointNanos = System.nanoTime() + delay;
+ }
+
+ /**
+ * Clear checkpoint progress counters.
+ */
+ public void clearCounters() {
+ currCheckpointPagesCnt = 0;
+
+ writtenPagesCntr = null;
+ syncedPagesCntr = null;
+ evictedPagesCntr = null;
+ }
+
+ /**
+ * Initialize all counters before checkpoint.
+ *
+ * @param pagesSize Number of dirty pages in current checkpoint at the beginning of checkpoint.
+ */
+ public void initCounters(int pagesSize) {
+ currCheckpointPagesCnt = pagesSize;
+
+ writtenPagesCntr = new AtomicInteger();
+ syncedPagesCntr = new AtomicInteger();
+ evictedPagesCntr = new AtomicInteger();
+ }
+
+ /**
+ * Changing checkpoint state if order of state is correct.
+ *
+ * @param newState New checkpoint state.
+ */
+ public void transitTo(CheckpointState newState) {
+ CheckpointState state = this.state.get();
+
+ if (state.ordinal() < newState.ordinal()) {
+ this.state.compareAndSet(state, newState);
+
+ doFinishFuturesWhichLessOrEqualTo(newState);
+ }
+ }
+
+ /**
+ * Mark this checkpoint execution as failed.
+ *
+ * @param error Causal error of fail.
+ */
+ public void fail(Throwable error) {
+ failCause = error;
+
+ transitTo(FINISHED);
+ }
+
+ /**
+ * Returns {@code true} if current state equal or greater to given state.
+ *
+ * @param expectedState Expected state.
+ */
+ public boolean greaterOrEqualTo(CheckpointState expectedState) {
+ return state.get().ordinal() >= expectedState.ordinal();
+ }
+
+ /**
+ * Returns current state.
+ */
+ CheckpointState state() {
+ return state.get();
+ }
+
+ /**
+ * Finishing futures with correct result in direct state order until lastState(included).
+ *
+ * @param lastState State until which futures should be done.
+ */
+ private void doFinishFuturesWhichLessOrEqualTo(CheckpointState lastState) {
+ for (CheckpointState old : CheckpointState.values()) {
+ completeFuture(stateFutures.get(old), failCause);
+
+ if (old == lastState) {
+ return;
+ }
+ }
+ }
+
+ private static void completeFuture(@Nullable CompletableFuture<?> future, @Nullable Throwable throwable) {
+ if (future != null && !future.isDone()) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ } else {
+ future.complete(null);
+ }
+ }
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
index 339f9a28f..e83a542b5 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
@@ -30,7 +30,7 @@ public class CheckpointReadWriteLock {
* Any thread with a such prefix is managed by the checkpoint. So some conditions can rely on it(ex. we don't need a checkpoint lock
* there because checkpoint is already held write lock).
*/
- // TODO: IGNITE-16898 I think it needs to be redone or relocated
+ // TODO: IGNITE-16935 I think it needs to be redone or relocated
static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
/** Checkpoint lock. */
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
index b246e7736..17b33fe75 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.pagememory.persistence.checkpoint;
/**
* Possible checkpoint states. Ordinal is important. Every next state follows the previous one.
*/
-// TODO: IGNITE-16898 Review states
public enum CheckpointState {
/** Checkpoint is waiting to execution. **/
SCHEDULED,
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
new file mode 100644
index 000000000..8e94086b2
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.Future;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This class responsibility is to complement {@link Checkpointer} class with side logic of checkpointing like checkpoint listeners
+ * notifications, collect dirt pages etc.
+ *
+ * <p>It allows {@link Checkpointer} class is to focus on its main responsibility: synchronizing memory with disk.
+ *
+ * <p>Additional actions needed during checkpoint are implemented in this class.
+ *
+ * <p>Two main blocks of logic this class is responsible for:
+ *
+ * <p>{@link CheckpointWorkflow#markCheckpointBegin} - Initialization of next checkpoint. It collects all required info.
+ *
+ * <p>{@link CheckpointWorkflow#markCheckpointEnd} - Finalization of last checkpoint.
+ */
+class CheckpointWorkflow implements IgniteComponent {
+ /**
+ * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
+ * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+ */
+ public static final String CHECKPOINT_PARALLEL_SORT_THRESHOLD = "CHECKPOINT_PARALLEL_SORT_THRESHOLD";
+
+ /**
+ * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
+ * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+ */
+ // TODO: IGNITE-16935 Move to configuration
+ private final int parallelSortThreshold = getInteger(CHECKPOINT_PARALLEL_SORT_THRESHOLD, 512 * 1024);
+
+ /** This number of threads will be created and used for parallel sorting. */
+ private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
+
+ /** Checkpoint marker storage. */
+ private final CheckpointMarkersStorage checkpointMarkersStorage;
+
+ /** Checkpoint lock. */
+ private final CheckpointReadWriteLock checkpointReadWriteLock;
+
+ /** Persistent data regions for the checkpointing. */
+ private final Collection<PageMemoryDataRegion> dataRegions;
+
+ /** Checkpoint write order configuration. */
+ private final CheckpointWriteOrder checkpointWriteOrder;
+
+ /** Collections of checkpoint listeners. */
+ private final List<IgniteBiTuple<CheckpointListener, PageMemoryDataRegion>> listeners = new CopyOnWriteArrayList<>();
+
+ /**
+ * Constructor.
+ *
+ * @param checkpointMarkersStorage Checkpoint marker storage.
+ * @param checkpointReadWriteLock Checkpoint read write lock.
+ * @param checkpointWriteOrder Checkpoint write order.
+ * @param dataRegions Persistent data regions for the checkpointing, doesn't copy.
+ */
+ public CheckpointWorkflow(
+ CheckpointMarkersStorage checkpointMarkersStorage,
+ CheckpointReadWriteLock checkpointReadWriteLock,
+ CheckpointWriteOrder checkpointWriteOrder,
+ Collection<PageMemoryDataRegion> dataRegions
+ ) {
+ this.checkpointMarkersStorage = checkpointMarkersStorage;
+ this.checkpointReadWriteLock = checkpointReadWriteLock;
+ this.checkpointWriteOrder = checkpointWriteOrder;
+ this.dataRegions = dataRegions;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() {
+ listeners.clear();
+ }
+
+ /**
+ * First stage of checkpoint which collects demanded information (dirty pages mostly).
+ *
+ * @param startCheckpointTimestamp Checkpoint start timestamp.
+ * @param curr Current checkpoint event info.
+ * @return Checkpoint collected info.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public Checkpoint markCheckpointBegin(
+ long startCheckpointTimestamp,
+ CheckpointProgressImpl curr
+ ) throws IgniteInternalCheckedException {
+ List<CheckpointListener> listeners = collectCheckpointListeners(dataRegions);
+
+ checkpointReadWriteLock.readLock();
+
+ try {
+ for (CheckpointListener listener : listeners) {
+ listener.beforeCheckpointBegin(curr);
+ }
+ } finally {
+ checkpointReadWriteLock.readUnlock();
+ }
+
+ checkpointReadWriteLock.writeLock();
+
+ CheckpointDirtyPagesInfoHolder dirtyPages;
+
+ try {
+ curr.transitTo(LOCK_TAKEN);
+
+ for (CheckpointListener listener : listeners) {
+ listener.onMarkCheckpointBegin(curr);
+ }
+
+ // There are allowable to replace pages only after checkpoint entry was stored to disk.
+ dirtyPages = beginCheckpoint(dataRegions, curr.futureFor(MARKER_STORED_TO_DISK));
+
+ curr.currentCheckpointPagesCount(dirtyPages.dirtyPageCount);
+
+ curr.transitTo(PAGE_SNAPSHOT_TAKEN);
+ } finally {
+ checkpointReadWriteLock.writeUnlock();
+ }
+
+ curr.transitTo(LOCK_RELEASED);
+
+ for (CheckpointListener listener : listeners) {
+ listener.onCheckpointBegin(curr);
+ }
+
+ if (dirtyPages.dirtyPageCount > 0) {
+ checkpointMarkersStorage.onCheckpointBegin(curr.id());
+
+ curr.transitTo(MARKER_STORED_TO_DISK);
+
+ return new Checkpoint(splitAndSortCpPagesIfNeeded(dirtyPages), curr);
+ }
+
+ return new Checkpoint(EMPTY, curr);
+ }
+
+ /**
+ * Do some actions on checkpoint finish (After all pages were written to disk).
+ *
+ * @param chp Checkpoint snapshot.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public void markCheckpointEnd(Checkpoint chp) throws IgniteInternalCheckedException {
+ synchronized (this) {
+ chp.progress.clearCounters();
+
+ for (PageMemoryDataRegion dataRegion : dataRegions) {
+ assert dataRegion.persistent() : dataRegion;
+
+ ((PageMemoryImpl) dataRegion.pageMemory()).finishCheckpoint();
+ }
+ }
+
+ checkpointMarkersStorage.onCheckpointEnd(chp.progress.id());
+
+ for (CheckpointListener listener : collectCheckpointListeners(dataRegions)) {
+ listener.afterCheckpointEnd(chp.progress);
+ }
+
+ chp.progress.transitTo(FINISHED);
+ }
+
+ /**
+ * Adds a listener to be called for the corresponding persistent data region.
+ *
+ * @param listener Listener.
+ * @param dataRegion Persistent data region for which listener is corresponded to, {@code null} for all regions.
+ */
+ public void addCheckpointListener(CheckpointListener listener, @Nullable PageMemoryDataRegion dataRegion) {
+ assert dataRegion == null || (dataRegion.persistent() && dataRegions.contains(dataRegion)) : dataRegion;
+
+ listeners.add(new IgniteBiTuple<>(listener, dataRegion));
+ }
+
+ /**
+ * Removes the listener.
+ *
+ * @param listener Listener.
+ */
+ public void removeCheckpointListener(CheckpointListener listener) {
+ listeners.remove(new IgniteBiTuple<CheckpointListener, PageMemoryDataRegion>() {
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object o) {
+ return listener == ((IgniteBiTuple<?, ?>) o).getKey();
+ }
+ });
+ }
+
+ /**
+ * Returns the checkpoint listeners for the data regions.
+ *
+ * @param dataRegions Data regions.
+ */
+ public List<CheckpointListener> collectCheckpointListeners(Collection<PageMemoryDataRegion> dataRegions) {
+ return listeners.stream()
+ .filter(tuple -> tuple.getValue() == null || dataRegions.contains(tuple.getValue()))
+ .map(IgniteBiTuple::getKey)
+ .collect(toUnmodifiableList());
+ }
+
+ private CheckpointDirtyPagesInfoHolder beginCheckpoint(
+ Collection<PageMemoryDataRegion> dataRegions,
+ CompletableFuture<?> allowToReplace
+ ) {
+ Collection<IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>>> pages = new ArrayList<>(dataRegions.size());
+
+ int pageCount = 0;
+
+ for (PageMemoryDataRegion dataRegion : dataRegions) {
+ assert dataRegion.persistent() : dataRegion;
+
+ Collection<FullPageId> dirtyPages = ((PageMemoryImpl) dataRegion.pageMemory()).beginCheckpoint(allowToReplace);
+
+ pageCount += dirtyPages.size();
+
+ pages.add(new IgniteBiTuple<>((PageMemoryImpl) dataRegion.pageMemory(), dirtyPages));
+ }
+
+ return new CheckpointDirtyPagesInfoHolder(pages, pageCount);
+ }
+
+ private static ForkJoinPool parallelSortInIsolatedPool(
+ FullPageId[] pagesArr,
+ Comparator<FullPageId> cmp,
+ @Nullable ForkJoinPool pool
+ ) throws IgniteInternalCheckedException {
+ ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool1 -> {
+ ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool1);
+
+ worker.setName("checkpoint-pages-sorter-" + worker.getPoolIndex());
+
+ return worker;
+ };
+
+ ForkJoinPool execPool = pool == null ? new ForkJoinPool(PARALLEL_SORT_THREADS + 1, factory, null, false) : pool;
+
+ Future<?> sortTask = execPool.submit(() -> Arrays.parallelSort(pagesArr, cmp));
+
+ try {
+ sortTask.get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new IgniteInternalCheckedException(
+ "Failed to perform pages array parallel sort",
+ e instanceof ExecutionException ? e.getCause() : e
+ );
+ }
+
+ return execPool;
+ }
+
+ private IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> splitAndSortCpPagesIfNeeded(
+ CheckpointDirtyPagesInfoHolder dirtyPages
+ ) throws IgniteInternalCheckedException {
+ Set<IgniteBiTuple<PageMemoryImpl, FullPageId[]>> cpPagesPerRegion = new HashSet<>();
+
+ int realPagesArrSize = 0;
+
+ for (IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>> regPages : dirtyPages.dirtyPages) {
+ FullPageId[] pages = new FullPageId[regPages.getValue().size()];
+
+ int pagePos = 0;
+
+ for (FullPageId dirtyPage : regPages.getValue()) {
+ assert realPagesArrSize++ != dirtyPages.dirtyPageCount :
+ "Incorrect estimated dirty pages number: " + dirtyPages.dirtyPageCount;
+
+ pages[pagePos++] = dirtyPage;
+ }
+
+ // Some pages may have been already replaced.
+ if (pagePos != pages.length) {
+ cpPagesPerRegion.add(new IgniteBiTuple<>(regPages.getKey(), Arrays.copyOf(pages, pagePos)));
+ } else {
+ cpPagesPerRegion.add(new IgniteBiTuple<>(regPages.getKey(), pages));
+ }
+ }
+
+ if (checkpointWriteOrder == CheckpointWriteOrder.SEQUENTIAL) {
+ Comparator<FullPageId> cmp = Comparator.comparingInt(FullPageId::groupId).thenComparingLong(FullPageId::effectivePageId);
+
+ ForkJoinPool pool = null;
+
+ for (IgniteBiTuple<PageMemoryImpl, FullPageId[]> pagesPerReg : cpPagesPerRegion) {
+ if (pagesPerReg.getValue().length >= parallelSortThreshold) {
+ pool = parallelSortInIsolatedPool(pagesPerReg.get2(), cmp, pool);
+ } else {
+ Arrays.sort(pagesPerReg.get2(), cmp);
+ }
+ }
+
+ if (pool != null) {
+ pool.shutdown();
+ }
+ }
+
+ return new IgniteConcurrentMultiPairQueue<>(cpPagesPerRegion);
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
similarity index 52%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
index 94056b669..4b48aa2e4 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
@@ -20,21 +20,32 @@ package org.apache.ignite.internal.pagememory.persistence.checkpoint;
import org.jetbrains.annotations.Nullable;
/**
- * Empty.
+ * This enum defines order of writing pages to disk storage during checkpoint.
*/
-// TODO: IGNITE-16898 Continue porting the code
-public abstract class Checkpointer {
+public enum CheckpointWriteOrder {
/**
- * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
- *
- * @param delayFromNow Delay from now in milliseconds.
- * @param reason Wakeup reason.
- * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
+ * Pages are written in order provided by checkpoint pages collection iterator (which is basically a hashtable).
+ */
+ RANDOM,
+
+ /**
+ * All checkpoint pages are collected into single list and sorted by page index. Provides almost sequential disk writes, which can be
+ * much faster on some SSD models.
*/
- public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason);
+ SEQUENTIAL;
/**
- * Returns runner thread, {@code null} if the worker has not yet started executing.
+ * Enumerated values.
+ */
+ private static final CheckpointWriteOrder[] VALS = values();
+
+ /**
+ * Efficiently gets enumerated value from its ordinal.
+ *
+ * @param ord Ordinal value.
+ * @return Enumerated value or {@code null} if ordinal out of range.
*/
- public abstract @Nullable Thread runner();
+ public static @Nullable CheckpointWriteOrder fromOrdinal(int ord) {
+ return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+ }
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 94056b669..422feb5f4 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -17,24 +17,12 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import org.jetbrains.annotations.Nullable;
-
/**
* Empty.
*/
-// TODO: IGNITE-16898 Continue porting the code
+// TODO: IGNITE-16935 Continue porting the code
public abstract class Checkpointer {
- /**
- * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
- *
- * @param delayFromNow Delay from now in milliseconds.
- * @param reason Wakeup reason.
- * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
- */
- public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason);
+ public abstract Thread runner();
- /**
- * Returns runner thread, {@code null} if the worker has not yet started executing.
- */
- public abstract @Nullable Thread runner();
+ public abstract CheckpointProgress scheduleCheckpoint(long l, String s);
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java
new file mode 100644
index 000000000..534c8ec07
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.function.Predicate.not;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Concurrent queue that wraps collection of {@code Pair<K, V[]>}.
+ *
+ * <p>The guarantee {@link #next} provided is sequentially emptify values per key array. i.e. input like:
+ *
+ * <br> p1 = new Pair<1, [1, 3, 5, 7]>
+ * <br> p2 = new Pair<2, [2, 3]>
+ * <br> p3 = new Pair<3, [200, 100]>
+ * <br> and further sequence of {@code poll} or {@code forEach} calls may produce output like:
+ * <br> [1, 1], [1, 3], [1, 5], [1, 7], [2, 2], [2, 3], [3, 200], [3, 100]
+ *
+ * @param <K> The type of key in input pair collection.
+ * @param <V> The type of value array.
+ */
+public class IgniteConcurrentMultiPairQueue<K, V> {
+ /** Empty pair queue. */
+ public static final IgniteConcurrentMultiPairQueue EMPTY = new IgniteConcurrentMultiPairQueue<>(Map.of());
+
+ /** Inner holder. */
+ private final V[][] vals;
+
+ /** Storage for every array length. */
+ private final int[] lenSeq;
+
+ /** Current absolute position. */
+ private final AtomicInteger pos = new AtomicInteger();
+
+ /** Precalculated max position. */
+ private final int maxPos;
+
+ /** Keys array. */
+ private final K[] keysArr;
+
+ /**
+ * Constructor.
+ *
+ * @param items Items.
+ */
+ public IgniteConcurrentMultiPairQueue(Map<K, ? extends Collection<V>> items) {
+ int pairCnt = (int) items.entrySet().stream().map(Map.Entry::getValue).filter(not(Collection::isEmpty)).count();
+
+ vals = (V[][]) new Object[pairCnt][];
+
+ keysArr = (K[]) new Object[pairCnt];
+
+ lenSeq = new int[pairCnt];
+
+ int keyPos = 0;
+
+ int size = -1;
+
+ for (Map.Entry<K, ? extends Collection<V>> p : items.entrySet()) {
+ if (p.getValue().isEmpty()) {
+ continue;
+ }
+
+ keysArr[keyPos] = p.getKey();
+
+ lenSeq[keyPos] = size += p.getValue().size();
+
+ vals[keyPos++] = (V[]) p.getValue().toArray();
+ }
+
+ maxPos = size + 1;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param items Items.
+ */
+ public IgniteConcurrentMultiPairQueue(Collection<IgniteBiTuple<K, V[]>> items) {
+ int pairCnt = (int) items.stream().map(Map.Entry::getValue).filter(k -> k.length > 0).count();
+
+ vals = (V[][]) new Object[pairCnt][];
+
+ keysArr = (K[]) new Object[pairCnt];
+
+ lenSeq = new int[pairCnt];
+
+ int keyPos = 0;
+
+ int size = -1;
+
+ for (Map.Entry<K, V[]> p : items) {
+ if (p.getValue().length == 0) {
+ continue;
+ }
+
+ keysArr[keyPos] = p.getKey();
+
+ lenSeq[keyPos] = size += p.getValue().length;
+
+ vals[keyPos++] = p.getValue();
+ }
+
+ maxPos = size + 1;
+ }
+
+ /**
+ * Retrieves and removes the head of this queue, or returns {@code false} if this queue is empty.
+ *
+ * @param res State holder.
+ * @return {@code true} if not empty result, or {@code false} if this queue is empty.
+ */
+ public boolean next(Result<K, V> res) {
+ int absPos = pos.getAndIncrement();
+
+ if (absPos >= maxPos) {
+ res.set(null, null, 0);
+
+ return false;
+ }
+
+ int segment = res.getSegment();
+
+ if (absPos > lenSeq[segment]) {
+ segment = Arrays.binarySearch(lenSeq, segment, lenSeq.length - 1, absPos);
+
+ segment = segment < 0 ? -segment - 1 : segment;
+ }
+
+ int relPos = segment == 0 ? absPos : (absPos - lenSeq[segment - 1] - 1);
+
+ K key = keysArr[segment];
+
+ res.set(key, vals[segment][relPos], segment);
+
+ return true;
+ }
+
+ /**
+ * Returns {@code true} if empty.
+ */
+ public boolean isEmpty() {
+ return pos.get() >= maxPos;
+ }
+
+ /**
+ * Returns Constant initialisation size.
+ */
+ public int initialSize() {
+ return maxPos;
+ }
+
+ /**
+ * State holder.
+ */
+ public static class Result<K, V> {
+ /** Current segment. */
+ private int segment;
+
+ /** Key holder. */
+ private K key;
+
+ /** Value holder. */
+ private V val;
+
+ /**
+ * Current state setter.
+ *
+ * @param k Key.
+ * @param v Value.
+ * @param seg Segment.
+ */
+ public void set(K k, V v, int seg) {
+ key = k;
+ val = v;
+ segment = seg;
+ }
+
+ /**
+ * Returns current segment.
+ */
+ private int getSegment() {
+ return segment;
+ }
+
+ /**
+ * Returns current key.
+ */
+ public K getKey() {
+ return key;
+ }
+
+ /**
+ * Returns current value.
+ */
+ public V getValue() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return S.toString(Result.class, this);
+ }
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
index 35ee3233d..cab9e42ae 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
@@ -81,7 +81,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
assertThat(memory.dirtyPages(), equalTo(dirtyPages));
- // TODO: IGNITE-16898 After the checkpoint check that there are no dirty pages
+ // TODO: IGNITE-16935 After the checkpoint check that there are no dirty pages
} finally {
memory.stop(true);
}
@@ -118,7 +118,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
assertFalse(memory.safeToUpdate(), "i=" + i);
}
- // TODO: IGNITE-16898 After the checkpoint check assertTrue(memory.safeToUpdate())
+ // TODO: IGNITE-16935 After the checkpoint check assertTrue(memory.safeToUpdate())
} finally {
memory.stop(true);
}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorageTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorageTest.java
new file mode 100644
index 000000000..1d4d8788a
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorageTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.IgniteUtils.deleteIfExists;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.FileWriter;
+import java.nio.file.Path;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * For {@link CheckpointMarkersStorage} testing.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(SystemPropertiesExtension.class)
+public class CheckpointMarkersStorageTest {
+ @WorkDirectory
+ private Path workDir;
+
+ @Test
+ void testFailCreateCheckpointDir() throws Exception {
+ Path testFile = createFile(workDir.resolve("testFile"));
+
+ try (FileWriter fileWriter = new FileWriter(testFile.toFile())) {
+ fileWriter.write("testString");
+
+ fileWriter.flush();
+ }
+
+ IgniteInternalCheckedException exception = assertThrows(
+ IgniteInternalCheckedException.class,
+ () -> new CheckpointMarkersStorage(testFile)
+ );
+
+ assertThat(exception.getMessage(), startsWith("Could not create directory for checkpoint metadata"));
+ }
+
+ @Test
+ void testNotOnlyMarkersFiles() throws Exception {
+ createFile(createDirectories(cpDir()).resolve("test"));
+
+ IgniteInternalCheckedException exception = assertThrows(
+ IgniteInternalCheckedException.class,
+ () -> new CheckpointMarkersStorage(workDir)
+ );
+
+ assertThat(exception.getMessage(), startsWith("Not checkpoint markers found, they need to be removed manually"));
+ }
+
+ @Test
+ void testTmpMarkersFiles() throws Exception {
+ createDirectories(cpDir());
+
+ UUID id = UUID.randomUUID();
+
+ createFile(cpDir().resolve(startMarkerFileName(id) + ".tmp"));
+ createFile(cpDir().resolve(endMarkerFileName(id) + ".tmp"));
+
+ IgniteInternalCheckedException exception = assertThrows(
+ IgniteInternalCheckedException.class,
+ () -> new CheckpointMarkersStorage(workDir)
+ );
+
+ assertThat(exception.getMessage(), startsWith("Not checkpoint markers found, they need to be removed manually"));
+ }
+
+ @Test
+ void testCheckpointWithoutEndMarker() throws Exception {
+ createDirectories(cpDir());
+
+ createFile(startMarkerFilePath(UUID.randomUUID()));
+
+ IgniteInternalCheckedException exception = assertThrows(
+ IgniteInternalCheckedException.class,
+ () -> new CheckpointMarkersStorage(workDir)
+ );
+
+ assertThat(exception.getMessage(), startsWith("Found incomplete checkpoints"));
+ }
+
+ @Test
+ void testCreateMarkers() throws Exception {
+ UUID id0 = UUID.randomUUID();
+
+ CheckpointMarkersStorage markersStorage = new CheckpointMarkersStorage(workDir);
+
+ markersStorage.onCheckpointBegin(id0);
+ markersStorage.onCheckpointEnd(id0);
+
+ assertThat(
+ list(cpDir()).collect(toSet()),
+ equalTo(Set.of(startMarkerFilePath(id0), endMarkerFilePath(id0)))
+ );
+
+ deleteIfExists(cpDir());
+
+ IgniteInternalCheckedException exception = assertThrows(
+ IgniteInternalCheckedException.class,
+ () -> markersStorage.onCheckpointBegin(UUID.randomUUID())
+ );
+
+ assertThat(exception.getMessage(), startsWith("Could not create start checkpoint marker"));
+
+ exception = assertThrows(
+ IgniteInternalCheckedException.class,
+ () -> markersStorage.onCheckpointEnd(id0)
+ );
+
+ assertThat(exception.getMessage(), startsWith("Could not create end checkpoint marker"));
+ }
+
+ @Test
+ void testCleanupMarkers() throws Exception {
+ CheckpointMarkersStorage markersStorage = new CheckpointMarkersStorage(workDir);
+
+ UUID id0 = UUID.randomUUID();
+ UUID id1 = UUID.randomUUID();
+ UUID id2 = UUID.randomUUID();
+
+ markersStorage.onCheckpointBegin(id0);
+ markersStorage.onCheckpointEnd(id0);
+
+ markersStorage.onCheckpointBegin(id1);
+ markersStorage.onCheckpointEnd(id1);
+
+ markersStorage.onCheckpointBegin(id2);
+ markersStorage.onCheckpointEnd(id2);
+
+ assertThat(
+ list(cpDir()).collect(toSet()),
+ equalTo(Set.of(startMarkerFilePath(id2), endMarkerFilePath(id2)))
+ );
+ }
+
+ private Path cpDir() {
+ return workDir.resolve("cp");
+ }
+
+ private Path startMarkerFilePath(UUID id) {
+ return cpDir().resolve(startMarkerFileName(id));
+ }
+
+ private Path endMarkerFilePath(UUID id) {
+ return cpDir().resolve(endMarkerFileName(id));
+ }
+
+ private static String startMarkerFileName(UUID id) {
+ return id + "-START.bin";
+ }
+
+ private static String endMarkerFileName(UUID id) {
+ return id + "-END.bin";
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
new file mode 100644
index 000000000..5a457f715
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.lang.System.nanoTime;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.SCHEDULED;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointProgressImpl} testing.
+ */
+public class CheckpointProgressImplTest {
+ @Test
+ void testId() {
+ assertNotNull(new CheckpointProgressImpl(0).id());
+ }
+
+ @Test
+ void testReason() {
+ CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+ assertNull(progressImpl.reason());
+
+ String reason = "test";
+
+ progressImpl.reason(reason);
+
+ assertEquals(reason, progressImpl.reason());
+ }
+
+ @Test
+ void testNextCheckpointNanos() {
+ long startNanos = nanoTime();
+
+ CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(10);
+
+ long endNanos = nanoTime();
+
+ assertThat(
+ progressImpl.nextCheckpointNanos() - startNanos,
+ greaterThanOrEqualTo(0L)
+ );
+
+ assertThat(
+ endNanos + 10 - progressImpl.nextCheckpointNanos(),
+ greaterThanOrEqualTo(0L)
+ );
+ }
+
+ @Test
+ void testCounters() {
+ CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+ assertEquals(0, progressImpl.currentCheckpointPagesCount());
+
+ assertNull(progressImpl.writtenPagesCounter());
+ assertNull(progressImpl.syncedPagesCounter());
+ assertNull(progressImpl.evictedPagesCounter());
+
+ progressImpl.initCounters(100500);
+
+ assertEquals(100500, progressImpl.currentCheckpointPagesCount());
+
+ assertNotNull(progressImpl.writtenPagesCounter());
+ assertNotNull(progressImpl.syncedPagesCounter());
+ assertNotNull(progressImpl.evictedPagesCounter());
+
+ progressImpl.clearCounters();
+
+ assertEquals(0, progressImpl.currentCheckpointPagesCount());
+
+ assertNull(progressImpl.writtenPagesCounter());
+ assertNull(progressImpl.syncedPagesCounter());
+ assertNull(progressImpl.evictedPagesCounter());
+
+ progressImpl.currentCheckpointPagesCount(42);
+
+ assertEquals(42, progressImpl.currentCheckpointPagesCount());
+ }
+
+ @Test
+ void testGreaterOrEqualTo() {
+ CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+ assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+ assertFalse(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+ assertFalse(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+ assertFalse(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+ assertFalse(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+ assertFalse(progressImpl.greaterOrEqualTo(FINISHED));
+
+ progressImpl.transitTo(LOCK_TAKEN);
+
+ assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+ assertTrue(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+ assertFalse(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+ assertFalse(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+ assertFalse(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+ assertFalse(progressImpl.greaterOrEqualTo(FINISHED));
+
+ progressImpl.transitTo(PAGE_SNAPSHOT_TAKEN);
+
+ assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+ assertTrue(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+ assertTrue(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+ assertFalse(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+ assertFalse(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+ assertFalse(progressImpl.greaterOrEqualTo(FINISHED));
+
+ progressImpl.transitTo(LOCK_RELEASED);
+
+ assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+ assertTrue(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+ assertTrue(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+ assertTrue(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+ assertFalse(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+ assertFalse(progressImpl.greaterOrEqualTo(FINISHED));
+
+ progressImpl.transitTo(MARKER_STORED_TO_DISK);
+
+ assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+ assertTrue(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+ assertTrue(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+ assertTrue(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+ assertTrue(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+ assertFalse(progressImpl.greaterOrEqualTo(FINISHED));
+
+ progressImpl.transitTo(FINISHED);
+
+ assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+ assertTrue(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+ assertTrue(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+ assertTrue(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+ assertTrue(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+ assertTrue(progressImpl.greaterOrEqualTo(FINISHED));
+ }
+
+ @Test
+ void testInProgress() {
+ CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+ assertFalse(progressImpl.inProgress());
+
+ progressImpl.transitTo(LOCK_TAKEN);
+ assertFalse(progressImpl.inProgress());
+
+ progressImpl.transitTo(LOCK_RELEASED);
+ assertTrue(progressImpl.inProgress());
+
+ progressImpl.transitTo(MARKER_STORED_TO_DISK);
+ assertTrue(progressImpl.inProgress());
+
+ progressImpl.transitTo(FINISHED);
+ assertFalse(progressImpl.inProgress());
+ }
+
+ @Test
+ void testFail() {
+ CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+ CompletableFuture<?> future = progressImpl.futureFor(PAGE_SNAPSHOT_TAKEN);
+
+ Exception failReason = new Exception("test");
+
+ progressImpl.fail(failReason);
+
+ assertEquals(FINISHED, progressImpl.state());
+
+ ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get(100, TimeUnit.MILLISECONDS));
+
+ assertSame(failReason, exception.getCause());
+ }
+
+ @Test
+ void testFutureFor() {
+ CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+ CompletableFuture<?> future0 = progressImpl.futureFor(SCHEDULED);
+
+ assertNotNull(future0);
+ assertTrue(future0.isDone());
+ assertSame(future0, progressImpl.futureFor(SCHEDULED));
+
+ CompletableFuture<?> future1 = progressImpl.futureFor(LOCK_TAKEN);
+
+ assertNotNull(future1);
+ assertFalse(future1.isDone());
+ assertSame(future1, progressImpl.futureFor(LOCK_TAKEN));
+
+ CompletableFuture<?> future2 = progressImpl.futureFor(PAGE_SNAPSHOT_TAKEN);
+
+ assertNotNull(future2);
+ assertFalse(future2.isDone());
+ assertSame(future2, progressImpl.futureFor(PAGE_SNAPSHOT_TAKEN));
+
+ CompletableFuture<?> future3 = progressImpl.futureFor(LOCK_RELEASED);
+
+ assertNotNull(future3);
+ assertFalse(future3.isDone());
+ assertSame(future3, progressImpl.futureFor(LOCK_RELEASED));
+
+ CompletableFuture<?> future4 = progressImpl.futureFor(MARKER_STORED_TO_DISK);
+
+ assertNotNull(future4);
+ assertFalse(future4.isDone());
+ assertSame(future4, progressImpl.futureFor(MARKER_STORED_TO_DISK));
+
+ CompletableFuture<?> future5 = progressImpl.futureFor(FINISHED);
+
+ assertNotNull(future5);
+ assertFalse(future5.isDone());
+ assertSame(future5, progressImpl.futureFor(FINISHED));
+
+ progressImpl.transitTo(LOCK_RELEASED);
+
+ assertDoesNotThrow(() -> future0.get(100, TimeUnit.MILLISECONDS));
+ assertDoesNotThrow(() -> future1.get(100, TimeUnit.MILLISECONDS));
+ assertDoesNotThrow(() -> future2.get(100, TimeUnit.MILLISECONDS));
+ assertDoesNotThrow(() -> future3.get(100, TimeUnit.MILLISECONDS));
+
+ assertFalse(future4.isDone());
+ assertFalse(future5.isDone());
+
+ assertSame(future0, progressImpl.futureFor(SCHEDULED));
+ assertSame(future1, progressImpl.futureFor(LOCK_TAKEN));
+ assertSame(future2, progressImpl.futureFor(PAGE_SNAPSHOT_TAKEN));
+ assertSame(future3, progressImpl.futureFor(LOCK_RELEASED));
+ assertSame(future4, progressImpl.futureFor(MARKER_STORED_TO_DISK));
+ assertSame(future5, progressImpl.futureFor(FINISHED));
+ }
+
+ @Test
+ void testTransitTo() {
+ CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+ assertEquals(SCHEDULED, progressImpl.state());
+
+ // Transit to LOCK_TAKEN.
+
+ progressImpl.transitTo(LOCK_TAKEN);
+ assertEquals(LOCK_TAKEN, progressImpl.state());
+
+ progressImpl.transitTo(SCHEDULED);
+ assertEquals(LOCK_TAKEN, progressImpl.state());
+
+ // Transit to PAGE_SNAPSHOT_TAKEN.
+
+ progressImpl.transitTo(PAGE_SNAPSHOT_TAKEN);
+ assertEquals(PAGE_SNAPSHOT_TAKEN, progressImpl.state());
+
+ progressImpl.transitTo(LOCK_TAKEN);
+ assertEquals(PAGE_SNAPSHOT_TAKEN, progressImpl.state());
+
+ progressImpl.transitTo(SCHEDULED);
+ assertEquals(PAGE_SNAPSHOT_TAKEN, progressImpl.state());
+
+ // Transit to LOCK_RELEASED.
+
+ progressImpl.transitTo(LOCK_RELEASED);
+ assertEquals(LOCK_RELEASED, progressImpl.state());
+
+ progressImpl.transitTo(PAGE_SNAPSHOT_TAKEN);
+ assertEquals(LOCK_RELEASED, progressImpl.state());
+
+ progressImpl.transitTo(LOCK_TAKEN);
+ assertEquals(LOCK_RELEASED, progressImpl.state());
+
+ progressImpl.transitTo(SCHEDULED);
+ assertEquals(LOCK_RELEASED, progressImpl.state());
+
+ // Transit to LOCK_RELEASED.
+
+ progressImpl.transitTo(MARKER_STORED_TO_DISK);
+ assertEquals(MARKER_STORED_TO_DISK, progressImpl.state());
+
+ progressImpl.transitTo(LOCK_RELEASED);
+ assertEquals(MARKER_STORED_TO_DISK, progressImpl.state());
+
+ progressImpl.transitTo(PAGE_SNAPSHOT_TAKEN);
+ assertEquals(MARKER_STORED_TO_DISK, progressImpl.state());
+
+ progressImpl.transitTo(LOCK_TAKEN);
+ assertEquals(MARKER_STORED_TO_DISK, progressImpl.state());
+
+ progressImpl.transitTo(SCHEDULED);
+ assertEquals(MARKER_STORED_TO_DISK, progressImpl.state());
+
+ // Transit to FINISHED.
+
+ progressImpl.transitTo(FINISHED);
+ assertEquals(FINISHED, progressImpl.state());
+
+ progressImpl.transitTo(MARKER_STORED_TO_DISK);
+ assertEquals(FINISHED, progressImpl.state());
+
+ progressImpl.transitTo(LOCK_RELEASED);
+ assertEquals(FINISHED, progressImpl.state());
+
+ progressImpl.transitTo(PAGE_SNAPSHOT_TAKEN);
+ assertEquals(FINISHED, progressImpl.state());
+
+ progressImpl.transitTo(LOCK_TAKEN);
+ assertEquals(FINISHED, progressImpl.state());
+
+ progressImpl.transitTo(SCHEDULED);
+ assertEquals(FINISHED, progressImpl.state());
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
new file mode 100644
index 000000000..df5ba882a
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link Checkpoint} testing.
+ */
+public class CheckpointTest {
+ @Test
+ void testHasDelta() {
+ CheckpointProgressImpl progress = mock(CheckpointProgressImpl.class);
+
+ assertFalse(new Checkpoint(EMPTY, progress).hasDelta());
+
+ IgniteBiTuple<PageMemoryImpl, FullPageId[]> biTuple = new IgniteBiTuple<>(
+ mock(PageMemoryImpl.class),
+ new FullPageId[]{new FullPageId(0, 1)}
+ );
+
+ assertTrue(new Checkpoint(new IgniteConcurrentMultiPairQueue<>(List.of(biTuple)), progress).hasDelta());
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
new file mode 100644
index 000000000..1459eb2ea
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Useful class for testing a checkpoint.
+ */
+class CheckpointTestUtils {
+ /**
+ * Returns new instance of {@link CheckpointReadWriteLock}.
+ *
+ * @param log Logger.
+ */
+ static CheckpointReadWriteLock newReadWriteLock(IgniteLogger log) {
+ return new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking(log, 5_000));
+ }
+
+ /**
+ * Returns mocked instance of {@link PageMemoryDataRegion}.
+ *
+ * @param persistent Persistent data region.
+ * @param pageMemory Persistent page memory.
+ */
+ static PageMemoryDataRegion newDataRegion(boolean persistent, PageMemoryImpl pageMemory) {
+ PageMemoryDataRegion mock = mock(PageMemoryDataRegion.class);
+
+ when(mock.persistent()).thenReturn(persistent);
+
+ when(mock.pageMemory()).thenReturn(pageMemory);
+
+ return mock;
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
new file mode 100644
index 000000000..a28b4c6a3
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Comparator.comparing;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.newDataRegion;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.newReadWriteLock;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.AFTER_CHECKPOINT_END;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.BEFORE_CHECKPOINT_BEGIN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_CHECKPOINT_BEGIN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_MARK_CHECKPOINT_BEGIN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWriteOrder.RANDOM;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWriteOrder.SEQUENTIAL;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.Result;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+/**
+ * For {@link CheckpointWorkflow} testing.
+ */
+public class CheckpointWorkflowTest {
+ private final IgniteLogger log = IgniteLogger.forClass(CheckpointWorkflowTest.class);
+
+ @Nullable
+ private CheckpointWorkflow workflow;
+
+ @AfterEach
+ void tearDown() {
+ if (workflow != null) {
+ workflow.stop();
+ }
+ }
+
+ @Test
+ void testListeners() {
+ PageMemoryDataRegion dataRegion0 = newDataRegion(true, mock(PageMemoryImpl.class));
+ PageMemoryDataRegion dataRegion1 = newDataRegion(true, mock(PageMemoryImpl.class));
+ PageMemoryDataRegion dataRegion2 = newDataRegion(true, mock(PageMemoryImpl.class));
+
+ workflow = new CheckpointWorkflow(
+ mock(CheckpointMarkersStorage.class),
+ newReadWriteLock(log),
+ RANDOM,
+ List.of(dataRegion0, dataRegion1)
+ );
+
+ workflow.start();
+
+ CheckpointListener listener0 = mock(CheckpointListener.class);
+ CheckpointListener listener1 = mock(CheckpointListener.class);
+ CheckpointListener listener2 = mock(CheckpointListener.class);
+ CheckpointListener listener3 = mock(CheckpointListener.class);
+ CheckpointListener listener4 = mock(CheckpointListener.class);
+
+ workflow.addCheckpointListener(listener0, dataRegion0);
+ workflow.addCheckpointListener(listener1, dataRegion0);
+ workflow.addCheckpointListener(listener2, dataRegion0);
+ workflow.addCheckpointListener(listener3, dataRegion1);
+ workflow.addCheckpointListener(listener4, null);
+
+ assertThat(
+ Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion0))),
+ equalTo(Set.of(listener0, listener1, listener2, listener4))
+ );
+
+ assertThat(
+ Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion1))),
+ equalTo(Set.of(listener3, listener4))
+ );
+
+ assertThat(
+ Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion2))),
+ equalTo(Set.of(listener4))
+ );
+
+ assertThat(
+ Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion0, dataRegion1))),
+ equalTo(Set.of(listener0, listener1, listener2, listener3, listener4))
+ );
+
+ // Checks remove listener.
+
+ workflow.removeCheckpointListener(listener0);
+ workflow.removeCheckpointListener(listener1);
+ workflow.removeCheckpointListener(listener3);
+
+ assertThat(
+ Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion0))),
+ equalTo(Set.of(listener2, listener4))
+ );
+
+ assertThat(
+ Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion1))),
+ equalTo(Set.of(listener4))
+ );
+
+ assertThat(
+ Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion2))),
+ equalTo(Set.of(listener4))
+ );
+
+ // Checks empty listeners after stop.
+
+ workflow.stop();
+
+ assertThat(workflow.collectCheckpointListeners(List.of(dataRegion0)), empty());
+ assertThat(workflow.collectCheckpointListeners(List.of(dataRegion1)), empty());
+ assertThat(workflow.collectCheckpointListeners(List.of(dataRegion2)), empty());
+ }
+
+ @Test
+ void testMarkCheckpointBegin() throws Exception {
+ CheckpointMarkersStorage markersStorage = mock(CheckpointMarkersStorage.class);
+
+ CheckpointReadWriteLock readWriteLock = newReadWriteLock(log);
+
+ List<FullPageId> dirtyPages = List.of(new FullPageId(0, 0), new FullPageId(1, 0), new FullPageId(2, 0));
+
+ PageMemoryDataRegion dataRegion = newDataRegion(true, newPageMemoryImpl(dirtyPages));
+
+ workflow = new CheckpointWorkflow(markersStorage, readWriteLock, RANDOM, List.of(dataRegion));
+
+ workflow.start();
+
+ CheckpointProgressImpl progressImpl = mock(CheckpointProgressImpl.class);
+
+ ArgumentCaptor<CheckpointState> checkpointStateArgumentCaptor = ArgumentCaptor.forClass(CheckpointState.class);
+
+ ArgumentCaptor<Integer> pagesCountArgumentCaptor = ArgumentCaptor.forClass(Integer.class);
+
+ doNothing().when(progressImpl).transitTo(checkpointStateArgumentCaptor.capture());
+
+ doNothing().when(progressImpl).currentCheckpointPagesCount(pagesCountArgumentCaptor.capture());
+
+ when(progressImpl.futureFor(MARKER_STORED_TO_DISK)).thenReturn(completedFuture(null));
+
+ UUID checkpointId = UUID.randomUUID();
+
+ when(progressImpl.id()).thenReturn(checkpointId);
+
+ List<String> events = new ArrayList<>();
+
+ workflow.addCheckpointListener(new TestCheckpointListener(events) {
+ /** {@inheritDoc} */
+ @Override
+ public void beforeCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ super.beforeCheckpointBegin(progress);
+
+ assertSame(progressImpl, progress);
+
+ assertEquals(readWriteLock.getReadHoldCount(), 1);
+
+ assertThat(checkpointStateArgumentCaptor.getAllValues(), empty());
+
+ verify(markersStorage, times(0)).onCheckpointBegin(checkpointId);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onMarkCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ super.onMarkCheckpointBegin(progress);
+
+ assertSame(progressImpl, progress);
+
+ assertTrue(readWriteLock.isWriteLockHeldByCurrentThread());
+
+ assertThat(checkpointStateArgumentCaptor.getAllValues(), equalTo(List.of(LOCK_TAKEN)));
+
+ verify(markersStorage, times(0)).onCheckpointBegin(checkpointId);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ super.onCheckpointBegin(progress);
+
+ assertSame(progressImpl, progress);
+
+ assertEquals(0, readWriteLock.getReadHoldCount());
+
+ assertFalse(readWriteLock.isWriteLockHeldByCurrentThread());
+
+ assertThat(checkpointStateArgumentCaptor.getAllValues(), equalTo(List.of(LOCK_TAKEN, PAGE_SNAPSHOT_TAKEN, LOCK_RELEASED)));
+
+ assertThat(pagesCountArgumentCaptor.getAllValues(), equalTo(List.of(3)));
+
+ verify(markersStorage, times(0)).onCheckpointBegin(checkpointId);
+ }
+ }, dataRegion);
+
+ Checkpoint checkpoint = workflow.markCheckpointBegin(coarseCurrentTimeMillis(), progressImpl);
+
+ List<IgniteBiTuple<PageMemoryImpl, FullPageId>> pairs = collect(checkpoint.dirtyPages);
+
+ assertThat(
+ pairs.stream().map(IgniteBiTuple::getKey).collect(toSet()),
+ equalTo(Set.of(dataRegion.pageMemory()))
+ );
+
+ assertThat(
+ pairs.stream().map(IgniteBiTuple::getValue).collect(toList()),
+ equalTo(dirtyPages)
+ );
+
+ assertThat(
+ events,
+ equalTo(List.of(BEFORE_CHECKPOINT_BEGIN, ON_MARK_CHECKPOINT_BEGIN, ON_CHECKPOINT_BEGIN))
+ );
+
+ assertThat(
+ checkpointStateArgumentCaptor.getAllValues(),
+ equalTo(List.of(LOCK_TAKEN, PAGE_SNAPSHOT_TAKEN, LOCK_RELEASED, MARKER_STORED_TO_DISK))
+ );
+
+ verify(markersStorage, times(1)).onCheckpointBegin(checkpointId);
+ }
+
+ @Test
+ void testMarkCheckpointBeginRandom() throws Exception {
+ List<FullPageId> dirtyPages = List.of(new FullPageId(1, 0), new FullPageId(0, 0), new FullPageId(2, 0));
+
+ PageMemoryDataRegion dataRegion = newDataRegion(true, newPageMemoryImpl(dirtyPages));
+
+ workflow = new CheckpointWorkflow(
+ mock(CheckpointMarkersStorage.class),
+ newReadWriteLock(log),
+ RANDOM,
+ List.of(dataRegion)
+ );
+
+ workflow.start();
+
+ CheckpointProgressImpl progressImpl = mock(CheckpointProgressImpl.class);
+
+ when(progressImpl.futureFor(MARKER_STORED_TO_DISK)).thenReturn(completedFuture(null));
+
+ Checkpoint checkpoint = workflow.markCheckpointBegin(coarseCurrentTimeMillis(), progressImpl);
+
+ assertThat(
+ collect(checkpoint.dirtyPages).stream().map(IgniteBiTuple::getValue).collect(toList()),
+ equalTo(dirtyPages)
+ );
+ }
+
+ @Test
+ void testMarkCheckpointBeginSequential() throws Exception {
+ List<FullPageId> dirtyPages = List.of(new FullPageId(1, 0), new FullPageId(0, 0), new FullPageId(2, 0));
+
+ PageMemoryDataRegion dataRegion = newDataRegion(true, newPageMemoryImpl(dirtyPages));
+
+ workflow = new CheckpointWorkflow(
+ mock(CheckpointMarkersStorage.class),
+ newReadWriteLock(log),
+ SEQUENTIAL,
+ List.of(dataRegion)
+ );
+
+ CheckpointProgressImpl progressImpl = mock(CheckpointProgressImpl.class);
+
+ when(progressImpl.futureFor(MARKER_STORED_TO_DISK)).thenReturn(completedFuture(null));
+
+ Checkpoint checkpoint = workflow.markCheckpointBegin(coarseCurrentTimeMillis(), progressImpl);
+
+ assertThat(
+ collect(checkpoint.dirtyPages).stream().map(IgniteBiTuple::getValue).collect(toList()),
+ equalTo(dirtyPages.stream().sorted(comparing(FullPageId::effectivePageId)).collect(toList()))
+ );
+ }
+
+ @Test
+ void testMarkCheckpointEnd() throws Exception {
+ CheckpointMarkersStorage markersStorage = mock(CheckpointMarkersStorage.class);
+
+ CheckpointReadWriteLock readWriteLock = newReadWriteLock(log);
+
+ PageMemoryImpl pageMemory = mock(PageMemoryImpl.class);
+
+ PageMemoryDataRegion dataRegion = newDataRegion(true, pageMemory);
+
+ workflow = new CheckpointWorkflow(markersStorage, readWriteLock, RANDOM, List.of(dataRegion));
+
+ workflow.start();
+
+ List<String> events = new ArrayList<>();
+
+ ArgumentCaptor<CheckpointState> checkpointStateArgumentCaptor = ArgumentCaptor.forClass(CheckpointState.class);
+
+ CheckpointProgressImpl progressImpl = mock(CheckpointProgressImpl.class);
+
+ doNothing().when(progressImpl).transitTo(checkpointStateArgumentCaptor.capture());
+
+ UUID checkpointId = UUID.randomUUID();
+
+ when(progressImpl.id()).thenReturn(checkpointId);
+
+ workflow.addCheckpointListener(new TestCheckpointListener(events) {
+ /** {@inheritDoc} */
+ @Override
+ public void afterCheckpointEnd(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ super.afterCheckpointEnd(progress);
+
+ assertSame(progressImpl, progress);
+
+ assertFalse(readWriteLock.isWriteLockHeldByCurrentThread());
+
+ assertEquals(0, readWriteLock.getReadHoldCount());
+
+ assertThat(checkpointStateArgumentCaptor.getAllValues(), empty());
+
+ verify(markersStorage, times(1)).onCheckpointEnd(checkpointId);
+ }
+ }, dataRegion);
+
+ workflow.markCheckpointEnd(new Checkpoint(EMPTY, progressImpl));
+
+ assertThat(checkpointStateArgumentCaptor.getAllValues(), equalTo(List.of(FINISHED)));
+
+ assertThat(events, equalTo(List.of(AFTER_CHECKPOINT_END)));
+
+ verify(progressImpl, times(1)).clearCounters();
+
+ verify(pageMemory, times(1)).finishCheckpoint();
+ }
+
+ private List<IgniteBiTuple<PageMemoryImpl, FullPageId>> collect(IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> queue) {
+ List<IgniteBiTuple<PageMemoryImpl, FullPageId>> res = new ArrayList<>();
+
+ Result<PageMemoryImpl, FullPageId> result = new Result<>();
+
+ while (queue.next(result)) {
+ res.add(new IgniteBiTuple<>(result.getKey(), result.getValue()));
+ }
+
+ return res;
+ }
+
+ private PageMemoryImpl newPageMemoryImpl(Collection<FullPageId> dirtyPages) {
+ PageMemoryImpl mock = mock(PageMemoryImpl.class);
+
+ when(mock.beginCheckpoint(any(CompletableFuture.class))).thenReturn(dirtyPages);
+
+ return mock;
+ }
+
+ /**
+ * Test listener implementation that simply collects events.
+ */
+ static class TestCheckpointListener implements CheckpointListener {
+ static final String ON_MARK_CHECKPOINT_BEGIN = "onMarkCheckpointBegin";
+
+ static final String ON_CHECKPOINT_BEGIN = "onCheckpointBegin";
+
+ static final String BEFORE_CHECKPOINT_BEGIN = "beforeCheckpointBegin";
+
+ static final String AFTER_CHECKPOINT_END = "afterCheckpointEnd";
+
+ final List<String> events;
+
+ /**
+ * Constructor.
+ *
+ * @param events For recording events.
+ */
+ TestCheckpointListener(List<String> events) {
+ this.events = events;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onMarkCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ events.add(ON_MARK_CHECKPOINT_BEGIN);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ events.add(ON_CHECKPOINT_BEGIN);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void beforeCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ events.add(BEFORE_CHECKPOINT_BEGIN);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void afterCheckpointEnd(CheckpointProgress progress) throws IgniteInternalCheckedException {
+ events.add(AFTER_CHECKPOINT_END);
+ }
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java
new file mode 100644
index 000000000..d1c817d67
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.synchronizedCollection;
+import static java.util.concurrent.ThreadLocalRandom.current;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.Result;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link IgniteConcurrentMultiPairQueue} testing.
+ */
+public class IgniteConcurrentMultiPairQueueTest {
+ private IgniteConcurrentMultiPairQueue<Integer, Integer> queue;
+
+ private IgniteConcurrentMultiPairQueue<Integer, Integer> queue2;
+
+ private Map<Integer, Collection<Integer>> mapForCheck;
+
+ private Map<Integer, Collection<Integer>> mapForCheck2;
+
+ private Integer[] arr2 = {2, 4};
+
+ private Integer[] arr1 = {1, 3, 5, 7, 9, 11, 13, 15, 17, 19};
+
+ private Integer[] arr4 = {};
+
+ private Integer[] arr5 = {};
+
+ private Integer[] arr3 = {100, 200, 300, 400, 500, 600, 600, 700};
+
+ private Integer[] arr6 = {};
+
+ @BeforeEach
+ void setUp() {
+ Collection<IgniteBiTuple<Integer, Integer[]>> keyWithArr = new HashSet<>();
+
+ mapForCheck = new ConcurrentHashMap<>();
+
+ mapForCheck2 = new ConcurrentHashMap<>();
+
+ keyWithArr.add(new IgniteBiTuple<>(10, arr2));
+ keyWithArr.add(new IgniteBiTuple<>(20, arr1));
+ keyWithArr.add(new IgniteBiTuple<>(30, arr4));
+ keyWithArr.add(new IgniteBiTuple<>(40, arr5));
+ keyWithArr.add(new IgniteBiTuple<>(50, arr3));
+ keyWithArr.add(new IgniteBiTuple<>(60, arr6));
+
+ mapForCheck.put(10, synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+ mapForCheck.put(20, synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+ mapForCheck.put(50, synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+
+ mapForCheck2.put(10, synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+ mapForCheck2.put(20, synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+ mapForCheck2.put(50, synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+
+ queue = new IgniteConcurrentMultiPairQueue<>(keyWithArr);
+
+ Map<Integer, Collection<Integer>> keyWithColl = new HashMap<>();
+
+ keyWithColl.put(10, synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+ keyWithColl.put(20, synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+ keyWithColl.put(30, synchronizedCollection(new ArrayList<>(Arrays.asList(arr4))));
+ keyWithColl.put(40, synchronizedCollection(new ArrayList<>(Arrays.asList(arr5))));
+ keyWithColl.put(50, synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+ keyWithColl.put(60, synchronizedCollection(new ArrayList<>(Arrays.asList(arr6))));
+
+ queue2 = new IgniteConcurrentMultiPairQueue<>(keyWithColl);
+ }
+
+ @Test
+ public void testGridConcurrentMultiPairQueueCorrectness() throws Exception {
+ runMultiThreaded(() -> {
+ Result<Integer, Integer> res = new Result<>();
+
+ while (queue.next(res)) {
+ assertTrue(mapForCheck.containsKey(res.getKey()));
+
+ assertTrue(mapForCheck.get(res.getKey()).remove(res.getValue()));
+
+ Collection<Integer> coll = mapForCheck.get(res.getKey());
+
+ if (coll != null && coll.isEmpty()) {
+ mapForCheck.remove(res.getKey(), coll);
+ }
+ }
+
+ return null;
+ }, current().nextInt(1, 20), "GridConcurrentMultiPairQueue arr test");
+
+ assertTrue(mapForCheck.isEmpty());
+
+ assertTrue(queue.isEmpty());
+
+ assertEquals(queue.initialSize(), arr1.length + arr2.length + arr3.length + arr4.length);
+
+ runMultiThreaded(() -> {
+ Result<Integer, Integer> res = new Result<>();
+
+ while (queue2.next(res)) {
+ assertTrue(mapForCheck2.containsKey(res.getKey()));
+
+ assertTrue(mapForCheck2.get(res.getKey()).remove(res.getValue()));
+
+ Collection<Integer> coll = mapForCheck2.get(res.getKey());
+
+ if (coll != null && coll.isEmpty()) {
+ mapForCheck2.remove(res.getKey(), coll);
+ }
+ }
+
+ return null;
+ }, current().nextInt(1, 20), "GridConcurrentMultiPairQueue coll test");
+
+ assertTrue(mapForCheck2.isEmpty());
+
+ assertTrue(queue2.isEmpty());
+
+ assertEquals(queue2.initialSize(), arr1.length + arr2.length + arr3.length + arr4.length);
+ }
+}