You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/08/10 09:37:17 UTC
[ignite-3] branch main updated: IGNITE-17077 persistedIndex implementation for PDS (#989)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d8e694eaa IGNITE-17077 persistedIndex implementation for PDS (#989)
d8e694eaa is described below
commit d8e694eaa1c4ff2dd709bb9001b5d32c9ebd43de
Author: ibessonov <be...@gmail.com>
AuthorDate: Wed Aug 10 12:37:13 2022 +0300
IGNITE-17077 persistedIndex implementation for PDS (#989)
---
.../PageMemoryCheckpointConfigurationSchema.java | 5 +
.../pagememory/persistence/PartitionMeta.java | 38 ++++++-
.../persistence/checkpoint/CheckpointManager.java | 11 +++
.../pagememory/persistence/io/PartitionMetaIo.java | 29 +++++-
.../persistence/PartitionMetaManagerTest.java | 2 +-
.../pagememory/persistence/PartitionMetaTest.java | 2 +-
.../persistence/checkpoint/CheckpointerTest.java | 2 +-
.../internal/storage/MvPartitionStorage.java | 8 +-
.../storage/AbstractMvPartitionStorageTest.java | 28 ++++++
.../TestConcurrentHashMapMvPartitionStorage.java | 13 +++
.../pagememory/AbstractPageMemoryTableStorage.java | 2 +-
.../PersistentPageMemoryStorageEngine.java | 9 +-
.../PersistentPageMemoryTableStorage.java | 22 ++++-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 25 +----
.../mv/PersistentPageMemoryMvPartitionStorage.java | 110 +++++++++++++++++++--
.../mv/VolatilePageMemoryMvPartitionStorage.java | 37 +++++++
...PersistentPageMemoryMvPartitionStorageTest.java | 5 +-
.../RocksDbStorageEngineConfigurationSchema.java | 3 +
.../rocksdb/RocksDbMvPartitionStorageTest.java | 5 +-
19 files changed, 304 insertions(+), 52 deletions(-)
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
index 2352b360a..69dbe516a 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
@@ -36,6 +36,11 @@ public class PageMemoryCheckpointConfigurationSchema {
@Value(hasDefault = true)
public int frequencyDeviation = 40;
+ /** Delay before executing a checkpoint triggered by RAFT. */
+ @Range(min = 0)
+ @Value(hasDefault = true)
+ public int checkpointDelayMillis = 200;
+
/** Number of checkpoint threads. */
@Range(min = 1)
@Value(hasDefault = true)
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
index 7720cf23a..1b8c9aad1 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
@@ -45,6 +45,8 @@ public class PartitionMeta {
}
}
+ private volatile long lastAppliedIndex;
+
// TODO: IGNITE-17466 Delete it
private volatile long treeRootPageId;
@@ -72,6 +74,7 @@ public class PartitionMeta {
* Constructor.
*
* @param checkpointId Checkpoint ID.
+ * @param lastAppliedIndex Last applied index value.
* @param treeRootPageId Tree root page ID.
* @param reuseListRootPageId Reuse list root page ID.
* @param versionChainTreeRootPageId Version chain tree root page ID.
@@ -81,6 +84,7 @@ public class PartitionMeta {
*/
public PartitionMeta(
@Nullable UUID checkpointId,
+ long lastAppliedIndex,
long treeRootPageId,
long reuseListRootPageId,
long versionChainTreeRootPageId,
@@ -88,6 +92,7 @@ public class PartitionMeta {
long rowVersionFreeListRootPageId,
int pageCount
) {
+ this.lastAppliedIndex = lastAppliedIndex;
this.treeRootPageId = treeRootPageId;
this.reuseListRootPageId = reuseListRootPageId;
this.versionChainTreeRootPageId = versionChainTreeRootPageId;
@@ -108,6 +113,7 @@ public class PartitionMeta {
PartitionMeta(@Nullable UUID checkpointId, PartitionMetaIo metaIo, long pageAddr) {
this(
checkpointId,
+ metaIo.getLastAppliedIndex(pageAddr),
metaIo.getTreeRootPageId(pageAddr),
metaIo.getReuseListRootPageId(pageAddr),
metaIo.getVersionChainTreeRootPageId(pageAddr),
@@ -117,6 +123,25 @@ public class PartitionMeta {
);
}
+ /**
+ * Returns a last applied index value.
+ */
+ public long lastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
+ /**
+ * Sets a last applied index value.
+ *
+ * @param checkpointId Checkpoint ID.
+ * @param lastAppliedIndex Last applied index value.
+ */
+ public void lastAppliedIndex(@Nullable UUID checkpointId, long lastAppliedIndex) {
+ updateSnapshot(checkpointId);
+
+ this.lastAppliedIndex = lastAppliedIndex;
+ }
+
/**
* Returns tree root page ID.
*/
@@ -246,7 +271,7 @@ public class PartitionMeta {
* @param checkpointId Checkpoint ID.
*/
private void updateSnapshot(@Nullable UUID checkpointId) {
- PartitionMetaSnapshot current = this.metaSnapshot;
+ PartitionMetaSnapshot current = metaSnapshot;
if (current.checkpointId != checkpointId) {
META_SNAPSHOT.compareAndSet(this, current, new PartitionMetaSnapshot(checkpointId, this));
@@ -265,6 +290,8 @@ public class PartitionMeta {
public static class PartitionMetaSnapshot {
private final @Nullable UUID checkpointId;
+ private final long lastAppliedIndex;
+
private final long treeRootPageId;
private final long reuseListRootPageId;
@@ -285,6 +312,7 @@ public class PartitionMeta {
*/
private PartitionMetaSnapshot(@Nullable UUID checkpointId, PartitionMeta partitionMeta) {
this.checkpointId = checkpointId;
+ this.lastAppliedIndex = partitionMeta.lastAppliedIndex;
this.treeRootPageId = partitionMeta.treeRootPageId;
this.reuseListRootPageId = partitionMeta.reuseListRootPageId;
this.versionChainTreeRootPageId = partitionMeta.versionChainTreeRootPageId;
@@ -293,6 +321,13 @@ public class PartitionMeta {
this.pageCount = partitionMeta.pageCount;
}
+ /**
+ * Returns a last applied index value.
+ */
+ public long lastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
/**
* Returns tree root page ID.
*/
@@ -342,6 +377,7 @@ public class PartitionMeta {
* @param pageAddr Address of the page with the partition meta.
*/
void writeTo(PartitionMetaIo metaIo, long pageAddr) {
+ metaIo.setLastAppliedIndex(pageAddr, lastAppliedIndex);
metaIo.setTreeRootPageId(pageAddr, treeRootPageId);
metaIo.setReuseListRootPageId(pageAddr, reuseListRootPageId);
metaIo.setVersionChainTreeRootPageId(pageAddr, versionChainTreeRootPageId);
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index 2af3d2956..b23a253a2 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -228,6 +228,17 @@ public class CheckpointManager {
return checkpointer.scheduleCheckpoint(0, reason);
}
+ /**
+ * Schedules a checkpoint in the future.
+ *
+ * @param delayMillis Delay in milliseconds from the curent moment.
+ * @param reason Checkpoint reason.
+ * @return Triggered checkpoint progress.
+ */
+ public CheckpointProgress scheduleCheckpoint(long delayMillis, String reason) {
+ return checkpointer.scheduleCheckpoint(delayMillis, reason);
+ }
+
/**
* Returns the progress of the last checkpoint, or the current checkpoint if in progress, {@code null} if no checkpoint has occurred.
*/
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
index c86a7f45f..5e3baa6a2 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
@@ -31,7 +31,9 @@ import org.apache.ignite.lang.IgniteStringBuilder;
* Io for partition metadata pages.
*/
public class PartitionMetaIo extends PageIo {
- private static final int TREE_ROOT_PAGE_ID_OFF = COMMON_HEADER_END;
+ private static final int LAST_APPLIED_INDEX_OFF = COMMON_HEADER_END;
+
+ private static final int TREE_ROOT_PAGE_ID_OFF = LAST_APPLIED_INDEX_OFF + Long.BYTES;
private static final int REUSE_LIST_ROOT_PAGE_ID_OFF = TREE_ROOT_PAGE_ID_OFF + Long.BYTES;
@@ -63,6 +65,7 @@ public class PartitionMetaIo extends PageIo {
public void initNewPage(long pageAddr, long pageId, int pageSize) {
super.initNewPage(pageAddr, pageId, pageSize);
+ setLastAppliedIndex(pageAddr, 0);
setTreeRootPageId(pageAddr, 0);
setReuseListRootPageId(pageAddr, 0);
setVersionChainTreeRootPageId(pageAddr, 0);
@@ -71,6 +74,27 @@ public class PartitionMetaIo extends PageIo {
setPageCount(pageAddr, 0);
}
+ /**
+ * Sets a last applied index value.
+ *
+ * @param pageAddr Page address.
+ * @param lastAppliedIndex Last applied index value.
+ */
+ public void setLastAppliedIndex(long pageAddr, long lastAppliedIndex) {
+ assertPageType(pageAddr);
+
+ putLong(pageAddr, LAST_APPLIED_INDEX_OFF, lastAppliedIndex);
+ }
+
+ /**
+ * Returns a last applied index value.
+ *
+ * @param pageAddr Page address.
+ */
+ public long getLastAppliedIndex(long pageAddr) {
+ return getLong(pageAddr, LAST_APPLIED_INDEX_OFF);
+ }
+
/**
* Sets tree root page ID.
*
@@ -205,7 +229,8 @@ public class PartitionMetaIo extends PageIo {
@Override
protected void printPage(long addr, int pageSize, IgniteStringBuilder sb) {
sb.app("TablePartitionMeta [").nl()
- .app("treeRootPageId=").appendHex(getTreeRootPageId(addr)).nl()
+ .app("lastAppliedIndex=").app(getLastAppliedIndex(addr)).nl()
+ .app(", treeRootPageId=").appendHex(getTreeRootPageId(addr)).nl()
.app(", reuseListRootPageId=").appendHex(getReuseListRootPageId(addr)).nl()
.app(", versionChainTreeRootPageId=").appendHex(getVersionChainTreeRootPageId(addr)).nl()
.app(", versionChainFreeListRootPageId=").appendHex(getVersionChainFreeListRootPageId(addr)).nl()
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
index 974a65378..4ca28e2fb 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
@@ -139,7 +139,7 @@ public class PartitionMetaManagerTest {
try (FilePageStore filePageStore = createFilePageStore(testFilePath)) {
manager.writeMetaToBuffer(
partId,
- new PartitionMeta(UUID.randomUUID(), 200, 1000, 300, 600, 900, 4).metaSnapshot(null),
+ new PartitionMeta(UUID.randomUUID(), 0, 200, 1000, 300, 600, 900, 4).metaSnapshot(null),
buffer.rewind()
);
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
index 9b21b7e7f..69e82272b 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
@@ -129,7 +129,7 @@ public class PartitionMetaTest {
void testSnapshot() {
UUID checkpointId = null;
- PartitionMeta meta = new PartitionMeta(checkpointId, 0, 0, 0, 0, 0, 0);
+ PartitionMeta meta = new PartitionMeta(checkpointId, 0, 0, 0, 0, 0, 0, 0);
checkSnapshot(meta.metaSnapshot(checkpointId), 0, 0, 0, 0, 0, 0);
checkSnapshot(meta.metaSnapshot(checkpointId = UUID.randomUUID()), 0, 0, 0, 0, 0, 0);
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index 6a47e317f..fb4710434 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -351,7 +351,7 @@ public class CheckpointerTest {
partitionMetaManager.addMeta(
new GroupPartitionId(0, 0),
- new PartitionMeta(null, 0, 0, 0, 0, 0, 3)
+ new PartitionMeta(null, 0, 0, 0, 0, 0, 0, 3)
);
FilePageStore filePageStore = mock(FilePageStore.class);
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index ed59c62ea..3f540f6d4 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -52,9 +52,7 @@ public interface MvPartitionStorage extends AutoCloseable {
* @return Closure result.
* @throws StorageException If failed to write data to the storage.
*/
- default <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
- return closure.execute();
- }
+ <V> V runConsistently(WriteClosure<V> closure) throws StorageException;
/**
* Flushes current state of the data or <i>the state from the nearest future</i> to the storage. It means that the future can be
@@ -63,9 +61,7 @@ public interface MvPartitionStorage extends AutoCloseable {
*
* @return Future that's completed when flushing of the data is completed.
*/
- default CompletableFuture<Void> flush() {
- return CompletableFuture.completedFuture(null);
- }
+ CompletableFuture<Void> flush();
/**
* Index of the highest write command applied to the storage. {@code 0} if index is unknown.
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 6e8cfc48a..b346b859b 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -18,10 +18,12 @@
package org.apache.ignite.internal.storage;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -35,6 +37,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.StreamSupport;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -679,4 +682,29 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertThrows(TxIdMismatchException.class, cursor::next);
}
+
+ /**
+ * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be successfully read and that it's returned from
+ * {@link MvPartitionStorage#persistedIndex()} after the {@link MvPartitionStorage#flush()}.
+ */
+ @Test
+ void testAppliedIndex() {
+ storage.runConsistently(() -> {
+ assertEquals(0, storage.lastAppliedIndex());
+ assertEquals(0, storage.persistedIndex());
+
+ storage.lastAppliedIndex(1);
+
+ assertEquals(1, storage.lastAppliedIndex());
+ assertThat(storage.persistedIndex(), is(lessThanOrEqualTo(1L)));
+
+ return null;
+ });
+
+ CompletableFuture<Void> flushFuture = storage.flush();
+
+ assertThat(flushFuture, willCompleteSuccessfully());
+
+ assertEquals(1, storage.persistedIndex());
+ }
}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index 5716d8a27..2677c70b2 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
@@ -74,6 +75,18 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
}
}
+ /** {@inheritDoc} */
+ @Override
+ public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
+ return closure.execute();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> flush() {
+ return CompletableFuture.completedFuture(null);
+ }
+
/** {@inheritDoc} */
@Override
public long lastAppliedIndex() {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 36190248a..6210ba4c9 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -58,7 +58,7 @@ public abstract class AbstractPageMemoryTableStorage implements TableStorage, Mv
*
* @param tableCfg Table configuration.
*/
- public AbstractPageMemoryTableStorage(TableConfiguration tableCfg) {
+ protected AbstractPageMemoryTableStorage(TableConfiguration tableCfg) {
this.tableCfg = tableCfg;
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index ee80383e7..0eafd238b 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -101,6 +101,13 @@ public class PersistentPageMemoryStorageEngine implements StorageEngine {
this.longJvmPauseDetector = longJvmPauseDetector;
}
+ /**
+ * Returns a storage engine configuration.
+ */
+ public PersistentPageMemoryStorageEngineConfiguration configuration() {
+ return engineConfig;
+ }
+
/** {@inheritDoc} */
@Override
public void start() throws StorageException {
@@ -188,7 +195,7 @@ public class PersistentPageMemoryStorageEngine implements StorageEngine {
PersistentPageMemoryDataStorageView dataStorageView = (PersistentPageMemoryDataStorageView) tableView.dataStorage();
- return new PersistentPageMemoryTableStorage(tableCfg, regions.get(dataStorageView.dataRegion()));
+ return new PersistentPageMemoryTableStorage(this, tableCfg, regions.get(dataStorageView.dataRegion()));
}
/** {@inheritDoc} */
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 5add88f72..f578728bc 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -45,23 +45,37 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
* Implementation of {@link AbstractPageMemoryTableStorage} for persistent case.
*/
public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableStorage {
+ /** Storage engine instance. */
+ private final PersistentPageMemoryStorageEngine engine;
+
+ /** Data region instance. */
private final PersistentPageMemoryDataRegion dataRegion;
/**
* Constructor.
*
+ * @param engine Storage engine instance.
* @param tableCfg Table configuration.
* @param dataRegion Data region for the table.
*/
public PersistentPageMemoryTableStorage(
+ PersistentPageMemoryStorageEngine engine,
TableConfiguration tableCfg,
PersistentPageMemoryDataRegion dataRegion
) {
super(tableCfg);
+ this.engine = engine;
this.dataRegion = dataRegion;
}
+ /**
+ * Returns a storage engine instance.
+ */
+ public PersistentPageMemoryStorageEngine engine() {
+ return engine;
+ }
+
/** {@inheritDoc} */
@Override
public boolean isVolatile() {
@@ -259,13 +273,15 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
);
return new PersistentPageMemoryMvPartitionStorage(
+ this,
partitionId,
tableView,
- dataRegion.pageMemory(),
+ dataRegion,
+ checkpointManager,
+ meta,
versionChainFreeList,
rowVersionFreeList,
- versionChainTree,
- checkpointTimeoutLock
+ versionChainTree
);
} catch (IgniteInternalCheckedException e) {
throw new StorageException(
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 904251a3a..b9b1e5773 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -67,11 +67,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
ScanVersionChainByTimestamp::new
);
- /**
- * Last applied index value.
- */
- private volatile long lastAppliedIndex = 0;
-
/**
* Constructor.
*
@@ -82,7 +77,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
* @param rowVersionFreeList Free list for {@link RowVersion}.
* @param versionChainTree Table tree for {@link VersionChain}.
*/
- public AbstractPageMemoryMvPartitionStorage(
+ protected AbstractPageMemoryMvPartitionStorage(
int partId,
TableView tableView,
PageMemory pageMemory,
@@ -102,24 +97,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
rowVersionDataPageReader = new DataPageReader(pageMemory, groupId, IoStatisticsHolderNoOp.INSTANCE);
}
- /** {@inheritDoc} */
- @Override
- public long lastAppliedIndex() {
- return lastAppliedIndex;
- }
-
- /** {@inheritDoc} */
- @Override
- public void lastAppliedIndex(long lastAppliedIndex) throws StorageException {
- this.lastAppliedIndex = lastAppliedIndex;
- }
-
- /** {@inheritDoc} */
- @Override
- public long persistedIndex() {
- return lastAppliedIndex;
- }
-
/** {@inheritDoc} */
@Override
public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 70ff1fd28..945e1e0a9 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -17,42 +17,85 @@
package org.apache.ignite.internal.storage.pagememory.mv;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.DataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTimeoutLock;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryTableStorage;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineView;
/**
* Implementation of {@link MvPartitionStorage} based on a {@link BplusTree} for persistent case.
*/
public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMvPartitionStorage {
+ /** Table storage instance. */
+ private final PersistentPageMemoryTableStorage tableStorage;
+
+ /** Checkpoint manager instance. */
+ private final CheckpointManager checkpointManager;
+
+ /** Checkpoint lock instance. */
private final CheckpointTimeoutLock checkpointTimeoutLock;
+ /** Partition meta instance. */
+ private final PartitionMeta meta;
+
+ /** Value of currently persisted last applied index. */
+ private volatile long persistedIndex;
+
+ /** Checkpoint listener that updates the value of {@link #persistedIndex}. */
+ private final CheckpointListener checkpointListener = new CheckpointListener() {
+ /** {@inheritDoc} */
+ @Override
+ public void afterCheckpointEnd(CheckpointProgress progress) {
+ persistedIndex = meta.metaSnapshot(progress.id()).lastAppliedIndex();
+ }
+ };
+
/**
* Constructor.
*
- * @param partId Partition id.
+ * @param tableStorage Table storage.
+ * @param partitionId Partition id.
* @param tableView Table configuration.
- * @param pageMemory Page memory.
+ * @param dataRegion Data region.
+ * @param checkpointManager Checkpoint manager.
+ * @param meta Partition meta.
* @param versionChainFreeList Free list for {@link VersionChain}.
* @param rowVersionFreeList Free list for {@link RowVersion}.
* @param versionChainTree Table tree for {@link VersionChain}.
- * @param checkpointTimeoutLock Checkpoint timeout lock.
*/
public PersistentPageMemoryMvPartitionStorage(
- int partId,
+ PersistentPageMemoryTableStorage tableStorage,
+ int partitionId,
TableView tableView,
- PersistentPageMemory pageMemory,
+ DataRegion<PersistentPageMemory> dataRegion,
+ CheckpointManager checkpointManager,
+ PartitionMeta meta,
VersionChainFreeList versionChainFreeList,
RowVersionFreeList rowVersionFreeList,
- VersionChainTree versionChainTree,
- CheckpointTimeoutLock checkpointTimeoutLock
+ VersionChainTree versionChainTree
) {
- super(partId, tableView, pageMemory, versionChainFreeList, rowVersionFreeList, versionChainTree);
+ super(partitionId, tableView, dataRegion.pageMemory(), versionChainFreeList, rowVersionFreeList, versionChainTree);
+
+ this.tableStorage = tableStorage;
+
+ this.checkpointManager = checkpointManager;
+ checkpointTimeoutLock = checkpointManager.checkpointTimeoutLock();
- this.checkpointTimeoutLock = checkpointTimeoutLock;
+ checkpointManager.addCheckpointListener(checkpointListener, dataRegion);
+
+ this.meta = meta;
}
/** {@inheritDoc} */
@@ -66,4 +109,53 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
checkpointTimeoutLock.checkpointReadUnlock();
}
}
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> flush() {
+ CheckpointProgress lastCheckpoint = checkpointManager.lastCheckpointProgress();
+
+ CheckpointProgress scheduledCheckpoint;
+
+ if (lastCheckpoint != null && meta.metaSnapshot(lastCheckpoint.id()).lastAppliedIndex() == meta.lastAppliedIndex()) {
+ scheduledCheckpoint = lastCheckpoint;
+ } else {
+ PersistentPageMemoryStorageEngineView engineCfg = tableStorage.engine().configuration().value();
+
+ int checkpointDelayMillis = engineCfg.checkpoint().checkpointDelayMillis();
+ scheduledCheckpoint = checkpointManager.scheduleCheckpoint(checkpointDelayMillis, "Triggered by replicator");
+ }
+
+ return scheduledCheckpoint.futureFor(CheckpointState.FINISHED).thenApply(res -> null);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long lastAppliedIndex() {
+ return meta.lastAppliedIndex();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void lastAppliedIndex(long lastAppliedIndex) throws StorageException {
+ assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
+
+ CheckpointProgress lastCheckpoint = checkpointManager.lastCheckpointProgress();
+
+ UUID lastCheckpointId = lastCheckpoint == null ? null : lastCheckpoint.id();
+
+ meta.lastAppliedIndex(lastCheckpointId, lastAppliedIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long persistedIndex() {
+ return persistedIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() {
+ checkpointManager.removeCheckpointListener(checkpointListener);
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index c076d1ccd..ef49c7989 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -17,15 +17,22 @@
package org.apache.ignite.internal.storage.pagememory.mv;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.internal.pagememory.inmemory.VolatilePageMemory;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.StorageException;
/**
* Implementation of {@link MvPartitionStorage} based on a {@link BplusTree} for in-memory case.
*/
public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPartitionStorage {
+ /**
+ * Last applied index value.
+ */
+ private volatile long lastAppliedIndex;
+
/**
* Constructor.
*
@@ -46,4 +53,34 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
) {
super(partId, tableView, pageMemory, versionChainFreeList, rowVersionFreeList, versionChainTree);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
+ return closure.execute();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> flush() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long lastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void lastAppliedIndex(long lastAppliedIndex) throws StorageException {
+ this.lastAppliedIndex = lastAppliedIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long persistedIndex() {
+ return lastAppliedIndex;
+ }
}
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index 23f677003..3058d4670 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -45,7 +45,10 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class PersistentPageMemoryMvPartitionStorageTest extends AbstractPageMemoryMvPartitionStorageTest {
- @InjectConfiguration(polymorphicExtensions = UnsafeMemoryAllocatorConfigurationSchema.class)
+ @InjectConfiguration(
+ value = "mock.checkpoint.checkpointDelayMillis = 0",
+ polymorphicExtensions = UnsafeMemoryAllocatorConfigurationSchema.class
+ )
private PersistentPageMemoryStorageEngineConfiguration engineConfig;
@InjectConfiguration(
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/configuration/schema/RocksDbStorageEngineConfigurationSchema.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/configuration/schema/RocksDbStorageEngineConfigurationSchema.java
index 5e1c75b48..34cc74084 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/configuration/schema/RocksDbStorageEngineConfigurationSchema.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/configuration/schema/RocksDbStorageEngineConfigurationSchema.java
@@ -25,6 +25,7 @@ import org.apache.ignite.configuration.annotation.Name;
import org.apache.ignite.configuration.annotation.NamedConfigValue;
import org.apache.ignite.configuration.annotation.Value;
import org.apache.ignite.configuration.validation.ExceptKeys;
+import org.apache.ignite.configuration.validation.Range;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
/**
@@ -35,6 +36,8 @@ public class RocksDbStorageEngineConfigurationSchema {
/** Name of the default data region. */
public static final String DEFAULT_DATA_REGION_NAME = "default";
+ /** Delay before executing a flush triggered by RAFT. */
+ @Range(min = 0)
@Value(hasDefault = true)
public int flushDelayMillis = 100;
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index 0291ea553..b1ec8a7bc 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -73,7 +73,10 @@ public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTes
assertThat(((RocksDbDataStorageView) tableCfg.dataStorage().value()).dataRegion(), equalTo(DEFAULT_DATA_REGION_NAME));
- engineConfig.defaultRegion().change(c -> c.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024)).get(1, SECONDS);
+ engineConfig.change(cfg -> cfg
+ .changeFlushDelayMillis(0)
+ .changeDefaultRegion(c -> c.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024))
+ ).get(1, SECONDS);
engine = new RocksDbStorageEngine(engineConfig, workDir);