You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/06/01 08:35:02 UTC
[ignite-3] branch main updated: IGNITE-16984 [Native Persistence 3.0] Porting a checkpoint and related code, part 4 (#822)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6fdb7c4bc IGNITE-16984 [Native Persistence 3.0] Porting a checkpoint and related code, part 4 (#822)
6fdb7c4bc is described below
commit 6fdb7c4bc70da1006b7ee908a04e93409dd23f88
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Jun 1 11:34:58 2022 +0300
IGNITE-16984 [Native Persistence 3.0] Porting a checkpoint and related code, part 4 (#822)
---
.../ignite/internal/util/worker/IgniteWorker.java | 16 +-
.../testframework/BaseIgniteAbstractTest.java | 7 +-
.../persistence/ItBplusTreePageMemoryImplTest.java | 6 +-
.../ItBplusTreeReuseListPageMemoryImplTest.java | 6 +-
.../tree/ItBplusTreeReplaceRemoveRaceTest.java | 2 -
.../pagememory/tree/ItBplusTreeSelfTest.java | 11 +-
.../PageMemoryCheckpointConfigurationSchema.java | 5 +-
.../pagememory/impl/PageMemoryNoStoreImpl.java | 11 +-
.../pagememory/persistence/PageMemoryEx.java | 118 -----------
.../pagememory/persistence/PageMemoryImpl.java | 133 ++++++++----
.../persistence/checkpoint/CheckpointManager.java | 208 +++++++++++++++++++
.../checkpoint/CheckpointReadWriteLock.java | 9 +-
.../persistence/checkpoint/CheckpointWorkflow.java | 11 +-
.../persistence/checkpoint/Checkpointer.java | 39 +---
.../ReentrantReadWriteLockWithTracking.java | 2 +-
...ointTestUtils.java => PageMemoryTestUtils.java} | 25 +--
.../pagememory/freelist/AbstractFreeListTest.java | 2 -
.../pagememory/impl/PageMemoryNoLoadSelfTest.java | 24 +--
.../persistence/PageMemoryImplNoLoadTest.java | 225 +++++++++++++++++----
.../checkpoint/CheckpointManagerTest.java | 109 ++++++++++
.../checkpoint/CheckpointTestUtils.java | 35 ++--
.../checkpoint/CheckpointWorkflowTest.java | 23 ++-
.../persistence/checkpoint/CheckpointerTest.java | 29 ---
.../pagememory/VolatilePageMemoryDataRegion.java | 5 -
24 files changed, 708 insertions(+), 353 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java
index 0cf0a40f6..43e03df07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java
@@ -65,7 +65,7 @@ public abstract class IgniteWorker implements Runnable, WorkProgressDispatcher {
private final Object mux = new Object();
/**
- * Creates new grid worker with given parameters.
+ * Creates new ignite worker with given parameters.
*
* @param log Logger.
* @param igniteInstanceName Name of the Ignite instance this runnable is used in.
@@ -94,7 +94,7 @@ public abstract class IgniteWorker implements Runnable, WorkProgressDispatcher {
runner = Thread.currentThread();
if (log.isDebugEnabled()) {
- log.debug("Grid runnable started: " + name);
+ log.debug("Ignite runnable started: " + name);
}
try {
@@ -118,7 +118,7 @@ public abstract class IgniteWorker implements Runnable, WorkProgressDispatcher {
// Catch everything to make sure that it gets logged properly and
// not to kill any threads from the underlying thread pool.
- log.error("Runtime error caught during grid runnable execution: " + this, e);
+ log.error("Runtime error caught during ignite runnable execution: " + this, e);
if (e instanceof Error) {
throw e;
@@ -138,11 +138,11 @@ public abstract class IgniteWorker implements Runnable, WorkProgressDispatcher {
if (log.isDebugEnabled()) {
if (isCancelled.get()) {
- log.debug("Grid runnable finished due to cancellation: " + name);
+ log.debug("Ignite runnable finished due to cancellation: " + name);
} else if (runner.isInterrupted()) {
- log.debug("Grid runnable finished due to interruption without cancellation: " + name);
+ log.debug("Ignite runnable finished due to interruption without cancellation: " + name);
} else {
- log.debug("Grid runnable finished normally: " + name);
+ log.debug("Ignite runnable finished normally: " + name);
}
}
@@ -194,7 +194,7 @@ public abstract class IgniteWorker implements Runnable, WorkProgressDispatcher {
*/
public void cancel() {
if (log.isDebugEnabled()) {
- log.debug("Cancelling grid runnable: " + this);
+ log.debug("Cancelling ignite runnable: " + this);
}
onCancel(isCancelled.compareAndSet(false, true));
@@ -207,7 +207,7 @@ public abstract class IgniteWorker implements Runnable, WorkProgressDispatcher {
*/
public void join() throws InterruptedException {
if (log.isDebugEnabled()) {
- log.debug("Joining grid runnable: " + this);
+ log.debug("Joining ignite runnable: " + this);
}
if ((runner == null && isCancelled.get()) || finished) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
index 66991eedf..44f6dce12 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
@@ -26,6 +26,7 @@ import java.nio.file.Path;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tostring.SensitiveDataLoggingPolicy;
import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -50,14 +51,14 @@ public abstract class BaseIgniteAbstractTest {
* Should be invoked before a test will start.
*
* @param testInfo Test information object.
- * @param workDir Work directory.
+ * @param workDir Work directory.
*/
- protected void setupBase(TestInfo testInfo, Path workDir) {
+ protected void setupBase(TestInfo testInfo, @Nullable Path workDir) {
log.info(">>> Starting test: {}#{}, displayName: {}, workDir: {}",
testInfo.getTestClass().map(Class::getSimpleName).orElse("<null>"),
testInfo.getTestMethod().map(Method::getName).orElse("<null>"),
testInfo.getDisplayName(),
- workDir.toAbsolutePath());
+ workDir == null ? "<null>" : workDir.toAbsolutePath());
this.testStartMs = monotonicMs();
}
diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java
index 18f9e2db0..88163cd51 100644
--- a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java
+++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.pagememory.persistence;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.mockCheckpointTimeoutLock;
import static org.apache.ignite.internal.util.Constants.MiB;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
-import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.pagememory.tree.ItBplusTreeSelfTest;
@@ -45,15 +45,15 @@ public class ItBplusTreePageMemoryImplTest extends ItBplusTreeSelfTest {
ioRegistry.loadFromServiceLoader();
return new PageMemoryImpl(
- new UnsafeMemoryProvider(null),
dataRegionCfg,
ioRegistry,
sizes,
new TestPageReadWriteManager(),
- (page, fullPageId, pageMemoryEx) -> {
+ (page, fullPageId, pageMemoryImpl) -> {
},
(fullPageId, buf, tag) -> {
},
+ mockCheckpointTimeoutLock(log, true),
PAGE_SIZE
);
}
diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java
index 9324f29e3..bb428a90d 100644
--- a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java
+++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.pagememory.persistence;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.mockCheckpointTimeoutLock;
import static org.apache.ignite.internal.util.Constants.MiB;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
-import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagememory.tree.ItBplusTreeReuseSelfTest;
/**
@@ -44,15 +44,15 @@ public class ItBplusTreeReuseListPageMemoryImplTest extends ItBplusTreeReuseSelf
ioRegistry.loadFromServiceLoader();
return new PageMemoryImpl(
- new UnsafeMemoryProvider(null),
dataRegionCfg,
ioRegistry,
sizes,
new TestPageReadWriteManager(),
- (page, fullPageId, pageMemoryEx) -> {
+ (page, fullPageId, pageMemoryImpl) -> {
},
(fullPageId, buf, tag) -> {
},
+ mockCheckpointTimeoutLock(log, true),
PAGE_SIZE
);
}
diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeReplaceRemoveRaceTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeReplaceRemoveRaceTest.java
index 952fec989..68e913e39 100644
--- a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeReplaceRemoveRaceTest.java
+++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeReplaceRemoveRaceTest.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryData
import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
import org.apache.ignite.internal.pagememory.impl.PageMemoryNoStoreImpl;
import org.apache.ignite.internal.pagememory.io.IoVersions;
-import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
import org.apache.ignite.internal.pagememory.tree.io.BplusLeafIo;
@@ -95,7 +94,6 @@ public class ItBplusTreeReplaceRemoveRaceTest extends BaseIgniteAbstractTest {
ioRegistry.loadFromServiceLoader();
return new PageMemoryNoStoreImpl(
- new UnsafeMemoryProvider(null),
dataRegionCfg,
ioRegistry,
512
diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeSelfTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeSelfTest.java
index d062e8a41..edd546362 100644
--- a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeSelfTest.java
+++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeSelfTest.java
@@ -83,7 +83,6 @@ import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAl
import org.apache.ignite.internal.pagememory.datastructure.DataStructure;
import org.apache.ignite.internal.pagememory.impl.PageMemoryNoStoreImpl;
import org.apache.ignite.internal.pagememory.io.IoVersions;
-import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagememory.reuse.ReuseList;
import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowClosure;
import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
@@ -105,6 +104,7 @@ import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
/**
@@ -155,7 +155,9 @@ public class ItBplusTreeSelfTest extends BaseIgniteAbstractTest {
private volatile CompletableFuture<?> asyncRunFut;
@BeforeEach
- protected void beforeEach() throws Exception {
+ protected void beforeEach(TestInfo testInfo) throws Exception {
+ setupBase(testInfo, null);
+
stop.set(false);
long seed = System.nanoTime();
@@ -172,7 +174,7 @@ public class ItBplusTreeSelfTest extends BaseIgniteAbstractTest {
}
@AfterEach
- protected void afterTest() throws Exception {
+ protected void afterTest(TestInfo testInfo) throws Exception {
rnd = null;
try {
@@ -211,6 +213,8 @@ public class ItBplusTreeSelfTest extends BaseIgniteAbstractTest {
RMV_INC = -1;
CNT = 10;
}
+
+ tearDownBase(testInfo);
}
/**
@@ -2740,7 +2744,6 @@ public class ItBplusTreeSelfTest extends BaseIgniteAbstractTest {
ioRegistry.loadFromServiceLoader();
return new PageMemoryNoStoreImpl(
- new UnsafeMemoryProvider(null),
dataRegionCfg,
ioRegistry,
PAGE_SIZE
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
index 0ee7ba82f..c58792b3d 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
@@ -65,9 +65,10 @@ public class PageMemoryCheckpointConfigurationSchema {
@Value(hasDefault = true)
public long readLockTimeout = 10_000;
- /** Enables log checkpoint read lock holders. */
+ /** Threshold for logging (if greater than zero) read lock holders in milliseconds. */
+ @Range(min = 0)
@Value(hasDefault = true)
- public boolean logReadLockHolders = false;
+ public long logReadLockThresholdTimeout = 0;
/** Use an asynchronous file I/O operations provider. */
@Value(hasDefault = true)
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoStoreImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoStoreImpl.java
index 17359e50d..97a9ef2d6 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoStoreImpl.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoStoreImpl.java
@@ -29,11 +29,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorView;
import org.apache.ignite.internal.pagememory.io.PageIo;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagememory.util.PageIdUtils;
@@ -167,23 +169,26 @@ public class PageMemoryNoStoreImpl implements PageMemory {
/**
* Constructor.
*
- * @param directMemoryProvider Memory allocator to use.
* @param dataRegionConfig Data region configuration.
* @param ioRegistry IO registry.
* @param pageSize Page size in bytes.
*/
public PageMemoryNoStoreImpl(
- DirectMemoryProvider directMemoryProvider,
PageMemoryDataRegionConfiguration dataRegionConfig,
PageIoRegistry ioRegistry,
// TODO: IGNITE-17017 Move to common config
int pageSize
) {
- this.directMemoryProvider = directMemoryProvider;
this.ioRegistry = ioRegistry;
this.trackAcquiredPages = false;
this.dataRegionConfigView = dataRegionConfig.value();
+ if (!(dataRegionConfigView.memoryAllocator() instanceof UnsafeMemoryAllocatorView)) {
+ throw new IgniteInternalException("Unexpected memory allocator: " + dataRegionConfigView.memoryAllocator());
+ }
+
+ directMemoryProvider = new UnsafeMemoryProvider(null);
+
sysPageSize = pageSize + PAGE_OVERHEAD;
assert sysPageSize % 8 == 0 : sysPageSize;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryEx.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryEx.java
deleted file mode 100644
index a71638b82..000000000
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryEx.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.internal.pagememory.PageMemory;
-import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-
-/**
- * Page memory with some persistence related additions.
- */
-//TODO IGNITE-16350 Improve javadoc in this class.
-//TODO IGNITE-16350 Consider removing this interface.
-public interface PageMemoryEx extends PageMemory {
- /**
- * Acquires a read lock associated with the given page.
- *
- * @param absPtr Absolute pointer to read lock.
- * @param pageId Page ID.
- * @param force Force flag.
- * @param touch Update page timestamp.
- * @return Pointer to the page read buffer.
- */
- long readLock(long absPtr, long pageId, boolean force, boolean touch);
-
- /**
- * Acquired a write lock on the page.
- *
- * @param grpId Group ID.
- * @param pageId Page ID.
- * @param page Page pointer.
- * @param restore Determines if the page is locked for restore memory (crash recovery).
- * @return Pointer to the page read buffer.
- */
- long writeLock(int grpId, long pageId, long page, boolean restore);
-
- /**
- * Releases locked page.
- *
- * @param grpId Group ID.
- * @param pageId Page ID.
- * @param page Page pointer.
- * @param dirtyFlag Determines whether the page was modified since the last checkpoint.
- * @param restore Determines if the page is locked for restore.
- */
- void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag, boolean restore);
-
- /**
- * Gets partition metadata page ID for specified grpId and partId.
- *
- * @param grpId Group ID.
- * @param partId Partition ID.
- * @return Meta page for grpId and partId.
- */
- long partitionMetaPageId(int grpId, int partId);
-
- /**
- * Returns an absolute pointer to a page, associated with the given page ID.
- *
- * @param grpId Group ID.
- * @param pageId Page ID.
- * @param pageAllocated Flag is set if new page was allocated in offheap memory.
- * @return Page.
- * @throws IgniteInternalCheckedException If failed.
- * @see #acquirePage(int, long) Sets additional flag indicating that page was not found in memory and had to be allocated.
- */
- long acquirePage(int grpId, long pageId, AtomicBoolean pageAllocated) throws IgniteInternalCheckedException;
-
- /**
- * Returns an absolute pointer to a page, associated with the given page ID.
- *
- * @param grpId Group ID.
- * @param pageId Page id.
- * @param restore Get page for restore
- * @return Page.
- * @throws IgniteInternalCheckedException If failed.
- * @throws StorageException If page reading failed from storage.
- * @see #acquirePage(int, long) Will read page from file if it is not present in memory.
- */
- long acquirePage(int grpId, long pageId, IoStatisticsHolder statHldr, boolean restore) throws IgniteInternalCheckedException;
-
- /**
- * Marks partition as invalid / outdated.
- *
- * @param grpId Group ID.
- * @param partId Partition ID.
- * @return New partition generation (growing 1-based partition file version).
- */
- int invalidate(int grpId, int partId);
-
- /**
- * Clears internal metadata of destroyed group.
- *
- * @param grpId Group ID.
- */
- void onCacheGroupDestroyed(int grpId);
-
- /**
- * Total pages can be placed to memory.
- */
- long totalPages();
-}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
index 7cad66c1f..7569db9f0 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
@@ -74,16 +74,20 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorView;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointMetricsTracker;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointPages;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTimeoutLock;
import org.apache.ignite.internal.pagememory.persistence.replacement.ClockPageReplacementPolicyFactory;
import org.apache.ignite.internal.pagememory.persistence.replacement.DelayedPageReplacementTracker;
import org.apache.ignite.internal.pagememory.persistence.replacement.PageReplacementPolicy;
@@ -99,7 +103,9 @@ import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
- * Page header structure is described by the following diagram.
+ * Page memory with some persistence related additions.
+ *
+ * <p>Page header structure is described by the following diagram.
*
* <p>When page is not allocated (in a free list):
* <pre>
@@ -123,7 +129,7 @@ import org.jetbrains.annotations.TestOnly;
* in use or not.
*/
@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
-public class PageMemoryImpl implements PageMemoryEx {
+public class PageMemoryImpl implements PageMemory {
/** Logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryImpl.class);
@@ -152,7 +158,7 @@ public class PageMemoryImpl implements PageMemoryEx {
private final PageIoRegistry ioRegistry;
/** Page manager. */
- private final PageReadWriteManager pmPageMgr;
+ private final PageReadWriteManager pageStoreManager;
/** Page size. */
private final int sysPageSize;
@@ -207,36 +213,45 @@ public class PageMemoryImpl implements PageMemoryEx {
/** Flush dirty page closure. */
private final PageStoreWriter flushDirtyPage;
+ /** Checkpoint timeout lock. */
+ private final CheckpointTimeoutLock checkpointTimeoutLock;
+
/**
* Constructor.
*
- * @param directMemoryProvider Memory allocator to use.
* @param dataRegionConfig Data region configuration.
* @param ioRegistry IO registry.
* @param sizes Segments sizes, the last one being the checkpoint buffer size.
- * @param pmPageMgr Page store manager.
+ * @param pageStoreManager Page store manager.
* @param changeTracker Callback invoked to track changes in pages.
* @param flushDirtyPage Write callback invoked when a dirty page is removed for replacement.
+ * @param checkpointTimeoutLock Checkpoint timeout lock.
* @param pageSize Page size in bytes.
*/
public PageMemoryImpl(
- DirectMemoryProvider directMemoryProvider,
PageMemoryDataRegionConfiguration dataRegionConfig,
PageIoRegistry ioRegistry,
long[] sizes,
- PageReadWriteManager pmPageMgr,
+ PageReadWriteManager pageStoreManager,
@Nullable PageChangeTracker changeTracker,
PageStoreWriter flushDirtyPage,
+ CheckpointTimeoutLock checkpointTimeoutLock,
// TODO: IGNITE-17017 Move to common config
int pageSize
) {
- this.directMemoryProvider = directMemoryProvider;
this.dataRegionConfigView = dataRegionConfig.value();
this.ioRegistry = ioRegistry;
this.sizes = sizes;
- this.pmPageMgr = pmPageMgr;
+ this.pageStoreManager = pageStoreManager;
this.changeTracker = changeTracker;
this.flushDirtyPage = flushDirtyPage;
+ this.checkpointTimeoutLock = checkpointTimeoutLock;
+
+ if (!(dataRegionConfigView.memoryAllocator() instanceof UnsafeMemoryAllocatorView)) {
+ throw new IgniteInternalException("Unexpected memory allocator: " + dataRegionConfigView.memoryAllocator());
+ }
+
+ directMemoryProvider = new UnsafeMemoryProvider(null);
sysPageSize = pageSize + PAGE_OVERHEAD;
@@ -379,8 +394,15 @@ public class PageMemoryImpl implements PageMemoryEx {
return readLock(page, pageId, false);
}
- /** {@inheritDoc} */
- @Override
+ /**
+ * Acquires a read lock associated with the given page.
+ *
+ * @param absPtr Absolute pointer to read lock.
+ * @param pageId Page ID.
+ * @param force Force flag.
+ * @param touch Update page timestamp.
+ * @return Pointer to the page read buffer.
+ */
public long readLock(long absPtr, long pageId, boolean force, boolean touch) {
assert started;
@@ -421,8 +443,15 @@ public class PageMemoryImpl implements PageMemoryEx {
return writeLock(grpId, pageId, page, false);
}
- /** {@inheritDoc} */
- @Override
+ /**
+ * Acquired a write lock on the page.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page ID.
+ * @param page Page pointer.
+ * @param restore Determines if the page is locked for restore memory (crash recovery).
+ * @return Pointer to the page read buffer.
+ */
public long writeLock(int grpId, long pageId, long page, boolean restore) {
assert started;
@@ -445,8 +474,15 @@ public class PageMemoryImpl implements PageMemoryEx {
writeUnlock(grpId, pageId, page, dirtyFlag, false);
}
- /** {@inheritDoc} */
- @Override
+ /**
+ * Releases locked page.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page ID.
+ * @param page Page pointer.
+ * @param dirtyFlag Determines whether the page was modified since the last checkpoint.
+ * @param restore Determines if the page is locked for restore.
+ */
public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag, boolean restore) {
assert started;
@@ -476,13 +512,12 @@ public class PageMemoryImpl implements PageMemoryEx {
assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION && flags == FLAG_AUX : "flags = " + flags + ", partId = " + partId;
assert started;
+ assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
- long pageId = pmPageMgr.allocatePage(grpId, partId, flags);
-
- assert pageIndex(pageId) > 0; //it's crucial for tracking pages (zero page is super one)
+ long pageId = pageStoreManager.allocatePage(grpId, partId, flags);
// We need to allocate page in memory for marking it dirty to save it in the next checkpoint.
- // Otherwise it is possible that on file will be empty page which will be saved at snapshot and read with error
+ // Otherwise, it is possible that on file will be empty page which will be saved at snapshot and read with error
// because there is no crc inside them.
Segment seg = segment(grpId, pageId);
@@ -566,8 +601,13 @@ public class PageMemoryImpl implements PageMemoryEx {
return false;
}
- /** {@inheritDoc} */
- @Override
+ /**
+ * Gets partition metadata page ID for specified grpId and partId.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ * @return Meta page for grpId and partId.
+ */
public long partitionMetaPageId(int grpId, int partId) {
assert started;
@@ -587,14 +627,30 @@ public class PageMemoryImpl implements PageMemoryEx {
return acquirePage(grpId, pageId, statHolder, false);
}
- /** {@inheritDoc} */
- @Override
+ /**
+ * Returns an absolute pointer to a page, associated with the given page ID.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page ID.
+ * @param pageAllocated Flag is set if new page was allocated in offheap memory.
+ * @return Page.
+ * @throws IgniteInternalCheckedException If failed.
+ * @see #acquirePage(int, long) Sets additional flag indicating that page was not found in memory and had to be allocated.
+ */
public long acquirePage(int grpId, long pageId, AtomicBoolean pageAllocated) throws IgniteInternalCheckedException {
return acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, false, pageAllocated);
}
- /** {@inheritDoc} */
- @Override
+ /**
+ * Returns an absolute pointer to a page, associated with the given page ID.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page id.
+ * @param restore Get page for restore
+ * @return Page.
+ * @throws IgniteInternalCheckedException If failed.
+ * @see #acquirePage(int, long) Will read page from file if it is not present in memory.
+ */
public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder, boolean restore) throws IgniteInternalCheckedException {
return acquirePage(grpId, pageId, statHolder, restore, null);
}
@@ -760,7 +816,7 @@ public class PageMemoryImpl implements PageMemoryEx {
long actualPageId = 0;
try {
- pmPageMgr.read(grpId, pageId, buf, false);
+ pageStoreManager.read(grpId, pageId, buf, false);
statHolder.trackPhysicalAndLogicalRead(pageAddr);
@@ -793,7 +849,6 @@ public class PageMemoryImpl implements PageMemoryEx {
/**
* Returns total pages can be placed in all segments.
*/
- @Override
public long totalPages() {
if (segments == null) {
return 0;
@@ -857,8 +912,13 @@ public class PageMemoryImpl implements PageMemoryEx {
);
}
- /** {@inheritDoc} */
- @Override
+ /**
+ * Marks partition as invalid / outdated.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ * @return New partition generation (growing 1-based partition file version).
+ */
public int invalidate(int grpId, int partId) {
synchronized (segmentsLock) {
if (!started) {
@@ -887,8 +947,11 @@ public class PageMemoryImpl implements PageMemoryEx {
}
}
- /** {@inheritDoc} */
- @Override
+ /**
+ * Clears internal metadata of destroyed group.
+ *
+ * @param grpId Group ID.
+ */
public void onCacheGroupDestroyed(int grpId) {
for (Segment seg : segments) {
seg.writeLock().lock();
@@ -1180,7 +1243,7 @@ public class PageMemoryImpl implements PageMemoryEx {
boolean wasDirty = dirty(absPtr, dirty);
if (dirty) {
- // TODO: IGNITE-16984 Don't forget add assertion for checkpoint lock held by this thread
+ assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
if (!wasDirty || forceAdd) {
Segment seg = segment(pageId.groupId(), pageId.pageId());
@@ -1430,7 +1493,7 @@ public class PageMemoryImpl implements PageMemoryEx {
// Can evict a dirty page only if should be written by a checkpoint.
// These pages does not have tmp buffer.
if (checkpointPages != null && checkpointPages.allowToSave(fullPageId)) {
- assert pmPageMgr != null;
+ assert pageStoreManager != null;
PageStoreWriter saveDirtyPage = delayedPageReplacementTracker != null
? delayedPageReplacementTracker.delayedPageWrite() : flushDirtyPage;
@@ -1716,9 +1779,9 @@ public class PageMemoryImpl implements PageMemoryEx {
*
* @param page – Page pointer.
* @param fullPageId Full page ID.
- * @param pageMemoryEx Page memory.
+ * @param pageMemoryImpl Page memory.
*/
- void apply(long page, FullPageId fullPageId, PageMemoryEx pageMemoryEx);
+ void apply(long page, FullPageId fullPageId, PageMemoryImpl pageMemoryImpl);
}
/**
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
new file mode 100644
index 000000000..2a8dd9ece
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.nio.file.Path;
+import java.util.Collection;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Main class to abstract checkpoint-related processes and actions and hide them from higher-level components.
+ *
+ * <p>Implements sharp checkpointing algorithm.
+ *
+ * <p>Represents only an intermediate step in refactoring of checkpointing component and may change in the future.
+ *
+ * <p>This checkpoint ensures that all pages marked as dirty under {@link #checkpointTimeoutLock} will be consistently saved to disk.
+ *
+ * <p>Configuration of this checkpoint allows the following:
+ * <ul>
+ * <li>Collecting all pages from configured dataRegions which was marked as dirty under {@link #checkpointTimeoutLock}.</li>
+ * <li>Marking the start of checkpoint on disk.</li>
+ * <li>Notifying the subscribers of different checkpoint states through {@link CheckpointListener}.</li>
+ * <li>Synchronizing collected pages with disk using {@link FilePageStoreManager}.</li>
+ * </ul>
+ */
+public class CheckpointManager implements IgniteComponent {
+ /** Checkpoint worker. */
+ private final Checkpointer checkpointer;
+
+ /** Main checkpoint steps. */
+ private final CheckpointWorkflow checkpointWorkflow;
+
+ /** Checkpoint markers storage which mark the start and end of each checkpoint. */
+ private final CheckpointMarkersStorage checkpointMarkersStorage;
+
+ /** Timeout checkpoint lock which should be used while write to memory happened. */
+ private final CheckpointTimeoutLock checkpointTimeoutLock;
+
+ /** Checkpoint page writer factory. */
+ private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
+
+ /**
+ * Constructor.
+ *
+ * @param igniteInstanceName Ignite instance name.
+ * @param checkpointConfig Checkpoint configuration.
+ * @param workerListener Listener for life-cycle checkpoint worker events.
+ * @param longJvmPauseDetector Long JVM pause detector.
+ * @param filePageStoreManager File page store manager.
+ * @param dataRegions Data regions.
+ * @param storagePath Storage path.
+ * @param pageSize Page size in bytes.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public CheckpointManager(
+ String igniteInstanceName,
+ @Nullable IgniteWorkerListener workerListener,
+ @Nullable LongJvmPauseDetector longJvmPauseDetector,
+ PageMemoryCheckpointConfiguration checkpointConfig,
+ FilePageStoreManager filePageStoreManager,
+ Collection<PageMemoryDataRegion> dataRegions,
+ Path storagePath,
+ // TODO: IGNITE-17017 Move to common config
+ int pageSize
+ ) throws IgniteInternalCheckedException {
+ PageMemoryCheckpointView checkpointConfigView = checkpointConfig.value();
+
+ long logReadLockThresholdTimeout = checkpointConfigView.logReadLockThresholdTimeout();
+
+ ReentrantReadWriteLockWithTracking reentrantReadWriteLockWithTracking = logReadLockThresholdTimeout > 0
+ ? new ReentrantReadWriteLockWithTracking(IgniteLogger.forClass(CheckpointReadWriteLock.class), logReadLockThresholdTimeout)
+ : new ReentrantReadWriteLockWithTracking();
+
+ CheckpointReadWriteLock checkpointReadWriteLock = new CheckpointReadWriteLock(reentrantReadWriteLockWithTracking);
+
+ checkpointMarkersStorage = new CheckpointMarkersStorage(storagePath);
+
+ checkpointWorkflow = new CheckpointWorkflow(
+ checkpointConfig,
+ checkpointMarkersStorage,
+ checkpointReadWriteLock,
+ dataRegions
+ );
+
+ checkpointPagesWriterFactory = new CheckpointPagesWriterFactory(
+ IgniteLogger.forClass(CheckpointPagesWriterFactory.class),
+ (fullPage, buf, tag) -> filePageStoreManager.write(fullPage.groupId(), fullPage.pageId(), buf, tag, true),
+ pageSize
+ );
+
+ checkpointer = new Checkpointer(
+ IgniteLogger.forClass(Checkpoint.class),
+ igniteInstanceName,
+ workerListener,
+ longJvmPauseDetector,
+ checkpointWorkflow,
+ checkpointPagesWriterFactory,
+ checkpointConfig
+ );
+
+ checkpointTimeoutLock = new CheckpointTimeoutLock(
+ IgniteLogger.forClass(CheckpointTimeoutLock.class),
+ checkpointReadWriteLock,
+ checkpointConfigView.readLockTimeout(),
+ () -> safeToUpdateAllPageMemories(dataRegions),
+ checkpointer
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ checkpointWorkflow.start();
+
+ checkpointer.start();
+
+ checkpointTimeoutLock.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ IgniteUtils.closeAll(
+ checkpointTimeoutLock::stop,
+ checkpointer::stop,
+ checkpointWorkflow::stop
+ );
+ }
+
+ /**
+ * Returns checkpoint timeout lock which can be used for protection of writing to memory.
+ */
+ public CheckpointTimeoutLock checkpointTimeoutLock() {
+ return checkpointTimeoutLock;
+ }
+
+ /**
+ * Adds a listener to be called for the corresponding persistent data region.
+ *
+ * @param listener Listener.
+ * @param dataRegion Persistent data region for which listener is corresponded to, {@code null} for all regions.
+ */
+ public void addCheckpointListener(CheckpointListener listener, PageMemoryDataRegion dataRegion) {
+ checkpointWorkflow.addCheckpointListener(listener, dataRegion);
+ }
+
+ /**
+ * Removes the listener.
+ *
+ * @param listener Listener.
+ */
+ public void removeCheckpointListener(CheckpointListener listener) {
+ checkpointWorkflow.removeCheckpointListener(listener);
+ }
+
+ /**
+ * Start the new checkpoint immediately.
+ *
+ * @param reason Checkpoint reason.
+ * @return Triggered checkpoint progress.
+ */
+ public CheckpointProgress forceCheckpoint(String reason) {
+ return checkpointer.scheduleCheckpoint(0, reason);
+ }
+
+ /**
+ * Returns {@link true} if it is safe for all {@link PageMemoryDataRegion data regions} to update their {@link PageMemory}.
+ *
+ * @param dataRegions Data regions.
+ * @see PageMemoryImpl#safeToUpdate()
+ */
+ static boolean safeToUpdateAllPageMemories(Collection<PageMemoryDataRegion> dataRegions) {
+ for (PageMemoryDataRegion dataRegion : dataRegions) {
+ if (dataRegion.persistent() && !((PageMemoryImpl) dataRegion.pageMemory()).safeToUpdate()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
index 2b10164a2..6ca595a5d 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.Checkpointer.CHECKPOINT_RUNNER_THREAD_PREFIX;
+
import java.util.concurrent.TimeUnit;
import org.apache.ignite.lang.IgniteInternalException;
@@ -26,13 +28,6 @@ import org.apache.ignite.lang.IgniteInternalException;
public class CheckpointReadWriteLock {
private final ThreadLocal<Integer> checkpointReadLockHoldCount = ThreadLocal.withInitial(() -> 0);
- /**
- * Any thread with a such prefix is managed by the checkpoint. So some conditions can rely on it(ex. we don't need a checkpoint lock
- * there because checkpoint is already held write lock).
- */
- // TODO: IGNITE-16984 I think it needs to be redone or relocated
- static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
-
/** Checkpoint lock. */
private final ReentrantReadWriteLockWithTracking checkpointLock;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
index c3ce00339..88d788e59 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -101,10 +102,12 @@ class CheckpointWorkflow implements IgniteComponent {
CheckpointReadWriteLock checkpointReadWriteLock,
Collection<PageMemoryDataRegion> dataRegions
) {
+ PageMemoryCheckpointView checkpointConfigView = checkpointConfig.value();
+
this.checkpointMarkersStorage = checkpointMarkersStorage;
this.checkpointReadWriteLock = checkpointReadWriteLock;
- this.checkpointWriteOrder = CheckpointWriteOrder.valueOf(checkpointConfig.writeOrder().value());
- this.parallelSortThreshold = checkpointConfig.parallelSortThreshold().value();
+ this.checkpointWriteOrder = CheckpointWriteOrder.valueOf(checkpointConfigView.writeOrder());
+ this.parallelSortThreshold = checkpointConfigView.parallelSortThreshold();
this.dataRegions = dataRegions;
}
@@ -214,7 +217,9 @@ class CheckpointWorkflow implements IgniteComponent {
}
}
- checkpointMarkersStorage.onCheckpointEnd(chp.progress.id());
+ if (chp.hasDelta()) {
+ checkpointMarkersStorage.onCheckpointEnd(chp.progress.id());
+ }
for (CheckpointListener listener : collectCheckpointListeners(dataRegions)) {
listener.afterCheckpointEnd(chp.progress);
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index fca733a88..9076e9bd0 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -22,8 +22,6 @@ import static java.lang.System.nanoTime;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
-import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
@@ -40,7 +38,6 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.LongAdder;
-import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -97,6 +94,13 @@ public class Checkpointer extends IgniteWorker implements IgniteComponent {
+ "pages=%d, "
+ "reason='%s']";
+ /**
+ * Any thread with a such prefix is managed by the checkpoint.
+ *
+ * <p>So some conditions can rely on it(ex. we don't need a checkpoint lock there because checkpoint is already held write lock).
+ */
+ static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
+
/** Pause detector. */
@Nullable
private final LongJvmPauseDetector pauseDetector;
@@ -218,39 +222,14 @@ public class Checkpointer extends IgniteWorker implements IgniteComponent {
* @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
*/
public CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason) {
- return scheduleCheckpoint(delayFromNow, reason, null);
- }
-
- /**
- * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
- *
- * @param delayFromNow Delay from now in milliseconds.
- * @param reason Wakeup reason.
- * @param finishFutureListener Checkpoint finish listener.
- * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
- */
- public CheckpointProgress scheduleCheckpoint(
- long delayFromNow,
- String reason,
- @Nullable BiConsumer<Void, Throwable> finishFutureListener
- ) {
CheckpointProgressImpl current = currentCheckpointProgress;
// If checkpoint haven't taken write lock yet it shouldn't trigger a new checkpoint but should return current one.
- if (finishFutureListener == null && current != null && !current.greaterOrEqualTo(LOCK_TAKEN)) {
+ if (current != null && !current.greaterOrEqualTo(LOCK_TAKEN)) {
return current;
}
- if (finishFutureListener != null) {
- // To be sure finishFutureListener will always be executed in checkpoint thread.
- synchronized (this) {
- current = scheduledCheckpointProgress;
-
- current.futureFor(FINISHED).whenComplete(finishFutureListener);
- }
- } else {
- current = scheduledCheckpointProgress;
- }
+ current = scheduledCheckpointProgress;
long nextNanos = nanoTime() + MILLISECONDS.toNanos(delayFromNow);
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTracking.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTracking.java
index da5a9a44b..7bbf3ab75 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTracking.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTracking.java
@@ -46,7 +46,7 @@ public class ReentrantReadWriteLockWithTracking implements ReadWriteLock {
* lock more than {@code readLockThreshold}.
*
* @param log Ignite logger.
- * @param readLockThreshold ReadLock threshold timeout.
+ * @param readLockThreshold ReadLock threshold timeout in milliseconds.
*/
public ReentrantReadWriteLockWithTracking(IgniteLogger log, long readLockThreshold) {
readLock = new ReadLockWithTracking(delegate, log, readLockThreshold);
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/PageMemoryTestUtils.java
similarity index 59%
copy from modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
copy to modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/PageMemoryTestUtils.java
index 1459eb2ea..b0c07b2ff 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/PageMemoryTestUtils.java
@@ -15,35 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+package org.apache.ignite.internal.pagememory;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
-import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
-import org.apache.ignite.lang.IgniteLogger;
-
/**
- * Useful class for testing a checkpoint.
+ * Useful class for testing a {@link PageMemory}.
*/
-class CheckpointTestUtils {
- /**
- * Returns new instance of {@link CheckpointReadWriteLock}.
- *
- * @param log Logger.
- */
- static CheckpointReadWriteLock newReadWriteLock(IgniteLogger log) {
- return new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking(log, 5_000));
- }
-
+public class PageMemoryTestUtils {
/**
* Returns mocked instance of {@link PageMemoryDataRegion}.
*
- * @param persistent Persistent data region.
- * @param pageMemory Persistent page memory.
+ * @param persistent Data region.
+ * @param pageMemory Page memory.
*/
- static PageMemoryDataRegion newDataRegion(boolean persistent, PageMemoryImpl pageMemory) {
+ public static PageMemoryDataRegion newDataRegion(boolean persistent, PageMemory pageMemory) {
PageMemoryDataRegion mock = mock(PageMemoryDataRegion.class);
when(mock.persistent()).thenReturn(persistent);
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/freelist/AbstractFreeListTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/freelist/AbstractFreeListTest.java
index d0f66f82a..4f2592b79 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/freelist/AbstractFreeListTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/freelist/AbstractFreeListTest.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryData
import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.pagememory.impl.PageMemoryNoStoreImpl;
-import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
@@ -180,7 +179,6 @@ public class AbstractFreeListTest extends BaseIgniteAbstractTest {
ioRegistry.load(TestDataPageIo.VERSIONS);
return new PageMemoryNoStoreImpl(
- new UnsafeMemoryProvider(null),
dataRegionCfg,
ioRegistry,
pageSize
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
index cd6246673..a77155dd0 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
@@ -38,9 +38,7 @@ import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryData
import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
import org.apache.ignite.internal.pagememory.io.PageIo;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
-import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
-import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagememory.util.PageIdUtils;
import org.apache.ignite.internal.pagememory.util.PageUtils;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -60,6 +58,10 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
private static final PageIo PAGE_IO = new TestPageIo();
+ protected static final int GRP_ID = -1;
+
+ protected static final int PARTITION_ID = 1;
+
@InjectConfiguration(polymorphicExtensions = UnsafeMemoryAllocatorConfigurationSchema.class)
protected PageMemoryDataRegionConfiguration dataRegionCfg;
@@ -305,17 +307,13 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
* Creates new page memory instance.
*
* @return Page memory implementation.
- * @throws Exception If failed.
*/
- protected PageMemory memory() throws Exception {
- DirectMemoryProvider provider = new UnsafeMemoryProvider(null);
-
+ protected PageMemory memory() {
PageIoRegistry ioRegistry = new PageIoRegistry();
ioRegistry.loadFromServiceLoader();
return new PageMemoryNoStoreImpl(
- provider,
dataRegionCfg,
ioRegistry,
PAGE_SIZE
@@ -330,8 +328,8 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
* @param page Page pointer.
* @param val Value to write.
*/
- private void writePage(PageMemory mem, FullPageId fullId, long page, int val) {
- long pageAddr = mem.writeLock(-1, fullId.pageId(), page);
+ protected void writePage(PageMemory mem, FullPageId fullId, long page, int val) {
+ long pageAddr = mem.writeLock(GRP_ID, fullId.pageId(), page);
try {
PAGE_IO.initNewPage(pageAddr, fullId.pageId(), mem.realPageSize(fullId.groupId()));
@@ -340,7 +338,7 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
PageUtils.putByte(pageAddr, i, (byte) val);
}
} finally {
- mem.writeUnlock(-1, fullId.pageId(), page, true);
+ mem.writeUnlock(GRP_ID, fullId.pageId(), page, true);
}
}
@@ -355,7 +353,7 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
private void readPage(PageMemory mem, long pageId, long page, int expVal) {
expVal &= 0xFF;
- long pageAddr = mem.readLock(-1, pageId, page);
+ long pageAddr = mem.readLock(GRP_ID, pageId, page);
assert pageAddr != 0;
@@ -366,7 +364,7 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
assertEquals(expVal, val, "Unexpected value at position: " + i);
}
} finally {
- mem.readUnlock(-1, pageId, page);
+ mem.readUnlock(GRP_ID, pageId, page);
}
}
@@ -378,6 +376,6 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
* @throws IgniteInternalCheckedException If failed.
*/
public static FullPageId allocatePage(PageIdAllocator mem) throws IgniteInternalCheckedException {
- return new FullPageId(mem.allocatePage(-1, 1, PageIdAllocator.FLAG_DATA), -1);
+ return new FullPageId(mem.allocatePage(GRP_ID, PARTITION_ID, PageIdAllocator.FLAG_DATA), GRP_ID);
}
}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
index 5c1bed2b6..a244f14e1 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
@@ -17,28 +17,49 @@
package org.apache.ignite.internal.pagememory.persistence;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.pagememory.PageMemoryTestUtils.newDataRegion;
import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.PAGE_OVERHEAD;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.mockCheckpointTimeoutLock;
import static org.apache.ignite.internal.util.Constants.MiB;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Set;
import java.util.stream.LongStream;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
import org.apache.ignite.internal.pagememory.impl.PageMemoryNoLoadSelfTest;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
-import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests {@link PageMemoryImpl}.
*/
+@ExtendWith(WorkDirectoryExtension.class)
public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
@BeforeEach
void setUp() throws Exception {
@@ -48,26 +69,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
/** {@inheritDoc} */
@Override
protected PageMemory memory() {
- return memory(LongStream.range(0, 10).map(i -> 5 * MiB).toArray());
- }
-
- protected PageMemoryImpl memory(long[] sizes) {
- PageIoRegistry ioRegistry = new PageIoRegistry();
-
- ioRegistry.loadFromServiceLoader();
-
- return new PageMemoryImpl(
- new UnsafeMemoryProvider(null),
- dataRegionCfg,
- ioRegistry,
- sizes,
- new TestPageReadWriteManager(),
- (page, fullPageId, pageMemoryEx) -> {
- },
- (fullPageId, buf, tag) -> {
- },
- PAGE_SIZE
- );
+ return createPageMemoryImpl(defaultSegmentSizes(), null, null);
}
/** {@inheritDoc} */
@@ -78,56 +80,191 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
}
@Test
- void testDirtyPages() throws Exception {
- PageMemoryImpl memory = (PageMemoryImpl) memory();
+ void testDirtyPages(
+ @InjectConfiguration PageMemoryCheckpointConfiguration checkpointConfig,
+ @WorkDirectory Path workDir
+ ) throws Exception {
+ FilePageStoreManager filePageStoreManager = createFilePageStoreManager(workDir);
+
+ Collection<PageMemoryDataRegion> dataRegions = new ArrayList<>();
+
+ CheckpointManager checkpointManager = createCheckpointManager(checkpointConfig, workDir, filePageStoreManager, dataRegions);
+
+ PageMemoryImpl pageMemoryImpl = createPageMemoryImpl(defaultSegmentSizes(), filePageStoreManager, checkpointManager);
+
+ dataRegions.add(newDataRegion(true, pageMemoryImpl));
+
+ filePageStoreManager.start();
- memory.start();
+ checkpointManager.start();
+
+ pageMemoryImpl.start();
try {
- Set<FullPageId> dirtyPages = Set.of(allocatePage(memory), allocatePage(memory));
+ initGroupFilePageStores(filePageStoreManager);
+
+ checkpointManager.checkpointTimeoutLock().checkpointReadLock();
+
+ try {
+ Set<FullPageId> dirtyPages = Set.of(createDirtyPage(pageMemoryImpl), createDirtyPage(pageMemoryImpl));
+
+ assertThat(pageMemoryImpl.dirtyPages(), equalTo(dirtyPages));
+ } finally {
+ checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
+ }
- assertThat(memory.dirtyPages(), equalTo(dirtyPages));
+ checkpointManager
+ .forceCheckpoint("for_test_flash_dirty_pages")
+ .futureFor(FINISHED)
+ .get(100, MILLISECONDS);
- // TODO: IGNITE-16984 After the checkpoint check that there are no dirty pages
+ assertThat(pageMemoryImpl.dirtyPages(), empty());
} finally {
- memory.stop(true);
+ closeAll(
+ () -> pageMemoryImpl.stop(true),
+ checkpointManager::stop,
+ filePageStoreManager::stop
+ );
}
}
@Test
- void testSafeToUpdate() throws Exception {
+ void testSafeToUpdate(
+ @InjectConfiguration PageMemoryCheckpointConfiguration checkpointConfig,
+ @WorkDirectory Path workDir
+ ) throws Exception {
+ FilePageStoreManager filePageStoreManager = createFilePageStoreManager(workDir);
+
+ Collection<PageMemoryDataRegion> dataRegions = new ArrayList<>();
+
+ CheckpointManager checkpointManager = createCheckpointManager(checkpointConfig, workDir, filePageStoreManager, dataRegions);
+
long systemPageSize = PAGE_SIZE + PAGE_OVERHEAD;
- dataRegionCfg
- .change(c -> c.changeInitSize(128 * systemPageSize).changeMaxSize(128 * systemPageSize))
- .get(1, SECONDS);
+ dataRegionCfg.change(c -> c.changeInitSize(128 * systemPageSize).changeMaxSize(128 * systemPageSize)).get(1, SECONDS);
- PageMemoryImpl memory = memory(new long[]{100 * systemPageSize, 28 * systemPageSize});
+ PageMemoryImpl pageMemoryImpl = createPageMemoryImpl(
+ new long[]{100 * systemPageSize, 28 * systemPageSize},
+ filePageStoreManager,
+ checkpointManager
+ );
+
+ dataRegions.add(newDataRegion(true, pageMemoryImpl));
+
+ filePageStoreManager.start();
+
+ checkpointManager.start();
- memory.start();
+ pageMemoryImpl.start();
try {
- long maxPages = memory.totalPages();
+ initGroupFilePageStores(filePageStoreManager);
+
+ long maxPages = pageMemoryImpl.totalPages();
long maxDirtyPages = (maxPages * 3 / 4);
assertThat(maxDirtyPages, greaterThanOrEqualTo(50L));
- for (int i = 0; i < maxDirtyPages - 1; i++) {
- allocatePage(memory);
+ checkpointManager.checkpointTimeoutLock().checkpointReadLock();
- assertTrue(memory.safeToUpdate(), "i=" + i);
- }
+ try {
+ for (int i = 0; i < maxDirtyPages - 1; i++) {
+ createDirtyPage(pageMemoryImpl);
- for (int i = (int) maxDirtyPages - 1; i < maxPages; i++) {
- allocatePage(memory);
+ assertTrue(pageMemoryImpl.safeToUpdate(), "i=" + i);
+ }
- assertFalse(memory.safeToUpdate(), "i=" + i);
+ for (int i = (int) maxDirtyPages - 1; i < maxPages; i++) {
+ createDirtyPage(pageMemoryImpl);
+
+ assertFalse(pageMemoryImpl.safeToUpdate(), "i=" + i);
+ }
+ } finally {
+ checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
}
- // TODO: IGNITE-16984 After the checkpoint check assertTrue(memory.safeToUpdate())
+ checkpointManager
+ .forceCheckpoint("for_test_safe_to_update")
+ .futureFor(FINISHED)
+ .get(100, MILLISECONDS);
+
+ assertTrue(pageMemoryImpl.safeToUpdate());
} finally {
- memory.stop(true);
+ closeAll(
+ () -> pageMemoryImpl.stop(true),
+ checkpointManager::stop,
+ filePageStoreManager::stop
+ );
+ }
+ }
+
+ protected PageMemoryImpl createPageMemoryImpl(
+ long[] sizes,
+ @Nullable FilePageStoreManager filePageStoreManager,
+ @Nullable CheckpointManager checkpointManager
+ ) {
+ PageIoRegistry ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ return new PageMemoryImpl(
+ dataRegionCfg,
+ ioRegistry,
+ sizes,
+ filePageStoreManager == null ? new TestPageReadWriteManager() : filePageStoreManager,
+ null,
+ (fullPageId, buf, tag) -> fail("Should not happen"),
+ checkpointManager == null ? mockCheckpointTimeoutLock(log, true) : checkpointManager.checkpointTimeoutLock(),
+ PAGE_SIZE
+ );
+ }
+
+ protected FullPageId createDirtyPage(PageMemoryImpl pageMemoryImpl) throws Exception {
+ FullPageId fullPageId = allocatePage(pageMemoryImpl);
+
+ long page = pageMemoryImpl.acquirePage(fullPageId.groupId(), fullPageId.pageId());
+
+ try {
+ writePage(pageMemoryImpl, fullPageId, page, 100);
+ } finally {
+ pageMemoryImpl.releasePage(fullPageId.groupId(), fullPageId.pageId(), page);
+ }
+
+ return fullPageId;
+ }
+
+ private static long[] defaultSegmentSizes() {
+ return LongStream.range(0, 10).map(i -> 5 * MiB).toArray();
+ }
+
+ private static CheckpointManager createCheckpointManager(
+ PageMemoryCheckpointConfiguration checkpointConfig,
+ Path storagePath,
+ FilePageStoreManager filePageStoreManager,
+ Collection<PageMemoryDataRegion> dataRegions
+ ) throws Exception {
+ return new CheckpointManager(
+ "test",
+ null,
+ null,
+ checkpointConfig,
+ filePageStoreManager,
+ dataRegions,
+ storagePath,
+ PAGE_SIZE
+ );
+ }
+
+ private static FilePageStoreManager createFilePageStoreManager(Path storagePath) throws Exception {
+ return new FilePageStoreManager(log, "test", storagePath, new RandomAccessFileIoFactory(), PAGE_SIZE);
+ }
+
+ private static void initGroupFilePageStores(FilePageStoreManager filePageStoreManager) throws Exception {
+ filePageStoreManager.initialize("Test", GRP_ID, PARTITION_ID + 1);
+
+ for (FilePageStore filePageStore : filePageStoreManager.getStores(GRP_ID)) {
+ filePageStore.ensure();
}
}
}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
new file mode 100644
index 000000000..874b04c32
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.PageMemoryTestUtils.newDataRegion;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager.safeToUpdateAllPageMemories;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import org.apache.ignite.internal.pagememory.impl.PageMemoryNoStoreImpl;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * For {@link CheckpointManager} testing.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+public class CheckpointManagerTest {
+ @InjectConfiguration
+ private PageMemoryCheckpointConfiguration checkpointConfig;
+
+ @WorkDirectory
+ private Path workDir;
+
+ @Test
+ void testSimple() throws Exception {
+ PageMemoryDataRegion dataRegion = newDataRegion(true, mock(PageMemoryImpl.class));
+
+ CheckpointManager checkpointManager = new CheckpointManager(
+ "test",
+ null,
+ null,
+ checkpointConfig,
+ mock(FilePageStoreManager.class),
+ List.of(dataRegion),
+ workDir,
+ 1024
+ );
+
+ assertDoesNotThrow(checkpointManager::start);
+
+ assertNotNull(checkpointManager.checkpointTimeoutLock());
+
+ CheckpointListener checkpointListener = new CheckpointListener() {
+ };
+
+ assertDoesNotThrow(() -> checkpointManager.addCheckpointListener(checkpointListener, dataRegion));
+ assertDoesNotThrow(() -> checkpointManager.removeCheckpointListener(checkpointListener));
+
+ assertNotNull(checkpointManager.forceCheckpoint("test"));
+ assertNotNull(checkpointManager.forceCheckpoint("test"));
+
+ assertDoesNotThrow(checkpointManager::stop);
+ }
+
+ @Test
+ void testSafeToUpdateAllPageMemories() {
+ PageMemoryDataRegion dataRegion0 = newDataRegion(false, mock(PageMemoryNoStoreImpl.class));
+
+ assertTrue(safeToUpdateAllPageMemories(List.of(dataRegion0)));
+
+ AtomicBoolean safeToUpdate = new AtomicBoolean();
+
+ PageMemoryImpl pageMemoryImpl = mock(PageMemoryImpl.class);
+
+ when(pageMemoryImpl.safeToUpdate()).then(answer -> safeToUpdate.get());
+
+ PageMemoryDataRegion dataRegion1 = newDataRegion(true, pageMemoryImpl);
+
+ assertFalse(safeToUpdateAllPageMemories(List.of(dataRegion1)));
+ assertFalse(safeToUpdateAllPageMemories(List.of(dataRegion0, dataRegion1)));
+
+ safeToUpdate.set(true);
+
+ assertTrue(safeToUpdateAllPageMemories(List.of(dataRegion1)));
+ assertTrue(safeToUpdateAllPageMemories(List.of(dataRegion0, dataRegion1)));
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
index 1459eb2ea..bf7ee684a 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
@@ -18,16 +18,13 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
-import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
import org.apache.ignite.lang.IgniteLogger;
/**
* Useful class for testing a checkpoint.
*/
-class CheckpointTestUtils {
+public class CheckpointTestUtils {
/**
* Returns new instance of {@link CheckpointReadWriteLock}.
*
@@ -38,18 +35,26 @@ class CheckpointTestUtils {
}
/**
- * Returns mocked instance of {@link PageMemoryDataRegion}.
+ * Returns mocked {@link CheckpointTimeoutLock}.
*
- * @param persistent Persistent data region.
- * @param pageMemory Persistent page memory.
+ * @param log Logger.
+ * @param checkpointHeldByCurrentThread Result of {@link CheckpointTimeoutLock#checkpointLockIsHeldByThread()}.
*/
- static PageMemoryDataRegion newDataRegion(boolean persistent, PageMemoryImpl pageMemory) {
- PageMemoryDataRegion mock = mock(PageMemoryDataRegion.class);
-
- when(mock.persistent()).thenReturn(persistent);
-
- when(mock.pageMemory()).thenReturn(pageMemory);
-
- return mock;
+ public static CheckpointTimeoutLock mockCheckpointTimeoutLock(IgniteLogger log, boolean checkpointHeldByCurrentThread) {
+ // Do not use "mock(CheckpointTimeoutLock.class)" because calling the CheckpointTimeoutLock.checkpointLockIsHeldByThread
+ // greatly degrades in time, which is critical for ItBPlus*Test (it increases from 2 minutes to 5 minutes).
+ return new CheckpointTimeoutLock(
+ log,
+ mock(CheckpointReadWriteLock.class),
+ Long.MAX_VALUE,
+ () -> true,
+ mock(Checkpointer.class)
+ ) {
+ /** {@inheritDoc} */
+ @Override
+ public boolean checkpointLockIsHeldByThread() {
+ return checkpointHeldByCurrentThread;
+ }
+ };
}
}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
index 3335c432d..71ece07f0 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
@@ -21,13 +21,13 @@ import static java.util.Comparator.comparing;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.pagememory.PageMemoryTestUtils.newDataRegion;
import static org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfigurationSchema.RANDOM_WRITE_ORDER;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
-import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.newDataRegion;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.newReadWriteLock;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.AFTER_CHECKPOINT_END;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.BEFORE_CHECKPOINT_BEGIN;
@@ -38,6 +38,7 @@ import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMi
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -52,6 +53,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -394,7 +396,7 @@ public class CheckpointWorkflowTest {
when(progressImpl.id()).thenReturn(checkpointId);
- workflow.addCheckpointListener(new TestCheckpointListener(events) {
+ TestCheckpointListener checkpointListener = new TestCheckpointListener(events) {
/** {@inheritDoc} */
@Override
public void afterCheckpointEnd(CheckpointProgress progress) throws IgniteInternalCheckedException {
@@ -410,9 +412,14 @@ public class CheckpointWorkflowTest {
verify(markersStorage, times(1)).onCheckpointEnd(checkpointId);
}
- }, dataRegion);
+ };
- workflow.markCheckpointEnd(new Checkpoint(EMPTY, progressImpl));
+ workflow.addCheckpointListener(checkpointListener, dataRegion);
+
+ workflow.markCheckpointEnd(new Checkpoint(
+ new IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory, List.of(new FullPageId(0, 0)))),
+ progressImpl
+ ));
assertThat(checkpointStateArgumentCaptor.getAllValues(), equalTo(List.of(FINISHED)));
@@ -421,6 +428,14 @@ public class CheckpointWorkflowTest {
verify(progressImpl, times(1)).clearCounters();
verify(pageMemory, times(1)).finishCheckpoint();
+
+ // Checks with empty dirty pages.
+
+ workflow.removeCheckpointListener(checkpointListener);
+
+ assertDoesNotThrow(() -> workflow.markCheckpointEnd(new Checkpoint(EMPTY, progressImpl)));
+
+ verify(markersStorage, times(1)).onCheckpointEnd(checkpointId);
}
private List<IgniteBiTuple<PageMemoryImpl, FullPageId>> collect(IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> queue) {
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index b4b2f8c35..1cf94db39 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -29,7 +29,6 @@ import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -55,7 +54,6 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.pagememory.FullPageId;
@@ -201,33 +199,6 @@ public class CheckpointerTest {
assertEquals(scheduledNextCheckpointNanos, currentProgress.nextCheckpointNanos());
assertEquals(scheduledReason, currentProgress.reason());
-
- // Checks the listener.
-
- BiConsumer<Void, Throwable> finishFutureListener = mock(BiConsumer.class);
-
- assertSame(scheduledProgress, checkpointer.scheduleCheckpoint(0, "test4", finishFutureListener));
- assertSame(currentProgress, checkpointer.currentProgress());
- assertSame(scheduledProgress, checkpointer.scheduledProgress());
-
- assertThat(scheduledProgress.nextCheckpointNanos() - nanoTime(), lessThan(0L));
-
- assertEquals("test4", scheduledProgress.reason());
-
- assertEquals(scheduledNextCheckpointNanos, currentProgress.nextCheckpointNanos());
- assertEquals(scheduledReason, currentProgress.reason());
-
- verify(finishFutureListener, times(0)).accept(null, null);
-
- currentProgress.transitTo(FINISHED);
-
- verify(finishFutureListener, times(0)).accept(null, null);
-
- scheduledProgress.transitTo(FINISHED);
-
- verify(finishFutureListener, times(1)).accept(null, null);
-
- verify(checkpointer, times(1)).nextCheckpointInterval();
}
@Test
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
index b59593424..36af324e7 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
@@ -22,11 +22,9 @@ import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTIT
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfiguration;
-import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfiguration;
import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.pagememory.impl.PageMemoryNoStoreImpl;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
-import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
import org.apache.ignite.internal.storage.StorageException;
@@ -59,10 +57,7 @@ class VolatilePageMemoryDataRegion extends AbstractPageMemoryDataRegion {
public void start() {
assert !persistent() : cfg.value().name();
- assert cfg.memoryAllocator() instanceof UnsafeMemoryAllocatorConfiguration : cfg.memoryAllocator();
-
PageMemory pageMemory = new PageMemoryNoStoreImpl(
- new UnsafeMemoryProvider(null),
cfg,
ioRegistry,
pageSize