You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2023/01/13 10:51:01 UTC
[ignite-3] branch main updated: IGNITE-17833 Implement partition destruction for volatile PageMemory (#1514)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1c3c006b96 IGNITE-17833 Implement partition destruction for volatile PageMemory (#1514)
1c3c006b96 is described below
commit 1c3c006b965da64ac95bf28bde8b1abe6988b8f5
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Fri Jan 13 14:50:56 2023 +0400
IGNITE-17833 Implement partition destruction for volatile PageMemory (#1514)
---
modules/page-memory/build.gradle | 1 +
.../pagememory/datastructure/DataStructure.java | 4 +-
.../ignite/internal/pagememory/tree/BplusTree.java | 237 ++++++++++++++++++++-
.../pagememory/util/CompletedGradualTask.java} | 23 +-
.../internal/pagememory/util/GradualTask.java | 60 ++++++
.../pagememory/util/GradualTaskExecutor.java | 105 +++++++++
.../pagememory/util/GradualTaskExecutorTest.java | 128 +++++++++++
.../org/apache/ignite/internal/storage/RowId.java | 4 +
.../storage/AbstractMvTableStorageTest.java | 53 ++++-
.../internal/storage/impl/TestMvTableStorage.java | 11 +-
.../pagememory/AbstractPageMemoryTableStorage.java | 41 ++++
.../PersistentPageMemoryTableStorage.java | 38 +---
.../pagememory/VolatilePageMemoryDataRegion.java | 25 ++-
.../VolatilePageMemoryDataStorageModule.java | 3 +-
.../VolatilePageMemoryStorageEngine.java | 45 +++-
.../pagememory/VolatilePageMemoryTableStorage.java | 35 +--
.../index/hash/PageMemoryHashIndexStorage.java | 4 +-
.../index/sorted/PageMemorySortedIndexStorage.java | 4 +-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 10 +-
.../mv/PersistentPageMemoryMvPartitionStorage.java | 2 +
.../mv/VolatilePageMemoryMvPartitionStorage.java | 113 +++++++++-
.../pagememory/mv/io/VersionChainLeafIo.java | 23 +-
.../VolatilePageMemoryMvTableStorageTest.java | 82 +++++--
.../VolatilePageMemoryHashIndexStorageTest.java | 3 +-
.../VolatilePageMemorySortedIndexStorageTest.java | 4 +-
.../storage/pagememory/mv/BlobStorageTest.java | 10 -
...ageMemoryMvPartitionStorageConcurrencyTest.java | 3 +-
.../VolatilePageMemoryMvPartitionStorageTest.java | 3 +-
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 10 +
29 files changed, 948 insertions(+), 136 deletions(-)
diff --git a/modules/page-memory/build.gradle b/modules/page-memory/build.gradle
index c309eeb614..50d47ea75b 100644
--- a/modules/page-memory/build.gradle
+++ b/modules/page-memory/build.gradle
@@ -39,6 +39,7 @@ dependencies {
testImplementation project(':ignite-configuration')
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation libs.mockito.core
+ testImplementation libs.mockito.junit
testImplementation libs.hamcrest.core
testImplementation libs.auto.service.annotations
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java
index ca2758a13c..3b937ec707 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java
@@ -27,6 +27,7 @@ import static org.apache.ignite.internal.pagememory.util.PageIdUtils.toDetailStr
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.PageIdAllocator;
import org.apache.ignite.internal.pagememory.PageMemory;
@@ -46,7 +47,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Base class for all the data structures based on {@link PageMemory}.
*/
-public abstract class DataStructure {
+public abstract class DataStructure implements ManuallyCloseable {
/** For tests. */
// TODO: https://issues.apache.org/jira/browse/IGNITE-16350
public static Random rnd;
@@ -470,6 +471,7 @@ public abstract class DataStructure {
/**
* Frees the resources allocated by this structure.
*/
+ @Override
public void close() {
lockLsnr.close();
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
index c601025f84..b0e1172866 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
import org.apache.ignite.internal.pagememory.tree.io.BplusLeafIo;
import org.apache.ignite.internal.pagememory.tree.io.BplusMetaIo;
+import org.apache.ignite.internal.pagememory.util.GradualTask;
import org.apache.ignite.internal.pagememory.util.PageHandler;
import org.apache.ignite.internal.pagememory.util.PageLockListener;
import org.apache.ignite.internal.tostring.S;
@@ -2742,11 +2743,15 @@ public abstract class BplusTree<L, T extends L> extends DataStructure implements
releasePage(metaPageId, metaPage);
}
+ addForRecycle(bag);
+
+ return pagesCnt;
+ }
+
+ private void addForRecycle(LongListReuseBag bag) throws IgniteInternalCheckedException {
reuseList.addForRecycle(bag);
assert bag.isEmpty() : bag.size();
-
- return pagesCnt;
}
/**
@@ -2845,14 +2850,72 @@ public abstract class BplusTree<L, T extends L> extends DataStructure implements
}
if (bag.size() == 128) {
- reuseList.addForRecycle(bag);
-
- assert bag.isEmpty() : bag.size();
+ addForRecycle(bag);
}
return pagesCnt;
}
+ /**
+ * Starts gradual destruction, that is, closes the tree, recycles its meta page, and returns a {@link GradualTask}
+ * that, when executed by a {@link org.apache.ignite.internal.pagememory.util.GradualTaskExecutor}, gradually destroys
+ * the tree.
+ *
+ * <p>This method is allowed to be invoked only when the tree is out of use (no concurrent operations are trying to read or
+ * update the tree after destroy beginning).
+ *
+ * @param c Visitor closure. Visits only leaf pages.
+ * @param forceDestroy Whether to proceed with destroying, even if tree is already marked as destroyed (see {@link #markDestroyed()}).
+ * @return GradualTask that will destroy the tree; it is the responsibility of a caller to pass this task for
+ * execution to a {@link org.apache.ignite.internal.pagememory.util.GradualTaskExecutor}.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public final GradualTask startGradualDestruction(@Nullable Consumer<L> c, boolean forceDestroy) throws IgniteInternalCheckedException {
+ close();
+
+ if (!markDestroyed() && !forceDestroy) {
+ return GradualTask.completed();
+ }
+
+ if (reuseList == null) {
+ return GradualTask.completed();
+ }
+
+ LongListReuseBag bag = new LongListReuseBag();
+
+ RootPageIdAndLevel rootPageIdAndLevel = detachMetaPage(bag);
+
+ return new DestroyTreeTask(bag, c, rootPageIdAndLevel.level, rootPageIdAndLevel.pageId);
+ }
+
+ private RootPageIdAndLevel detachMetaPage(LongListReuseBag bag) throws IgniteInternalCheckedException {
+ long metaPage = acquirePage(metaPageId);
+
+ try {
+ long metaPageAddr = writeLock(metaPageId, metaPage); // No checks, we must be out of use.
+
+ try {
+ assert metaPageAddr != 0L;
+
+ int rootLvl = getRootLevel(metaPageAddr);
+
+ if (rootLvl < 0) {
+ fail("Root level: " + rootLvl);
+ }
+
+ long rootPageId = getFirstPageId(metaPageId, metaPage, rootLvl, metaPageAddr);
+
+ bag.addFreePage(recyclePage(metaPageId, metaPageAddr));
+
+ return new RootPageIdAndLevel(rootPageId, rootLvl);
+ } finally {
+ writeUnlock(metaPageId, metaPage, metaPageAddr, true);
+ }
+ } finally {
+ releasePage(metaPageId, metaPage);
+ }
+ }
+
/**
* Marks the tree as destroyed.
*
@@ -6576,4 +6639,168 @@ public abstract class BplusTree<L, T extends L> extends DataStructure implements
+ getClass().getSimpleName() + " [grpName=" + grpName + ", treeName=" + name() + ", metaPageId="
+ hexLong(metaPageId) + "].";
}
+
+ private static class RootPageIdAndLevel {
+ private final long pageId;
+ private final int level;
+
+ private RootPageIdAndLevel(long pageId, int level) {
+ this.pageId = pageId;
+ this.level = level;
+ }
+ }
+
+ private class DestroyTreeTask implements GradualTask {
+ /**
+ * Number of work units per step to execute. Recycling of a node counts as 1 work unit; also, visiting an item
+ * using a Consumer also counts as 1 work unit per item.
+ */
+ private static final int WORK_UNITS_PER_STEP = 1_000_000;
+
+ private final LongListReuseBag bag;
+ private final @Nullable Consumer<L> actOnEachElement;
+ private final int rootLevel;
+
+ /** IDs of pages contained in inner pages on each level. First index is level. */
+ private final long[][] childrenPageIds;
+
+ /** Indices of current page ID (among {@link #childrenPageIds}) on each level. Indexed by level. */
+ private final int[] currentChildIndices;
+
+ /** Level on which we currently are. */
+ private int currentLevel;
+
+ /** ID of the page that we are going to process (that is, recycle it after reading its contents) next. */
+ private long currentPageId;
+
+ private boolean finished = false;
+
+ private DestroyTreeTask(LongListReuseBag bag, @Nullable Consumer<L> actOnEachElement, int rootLevel, long rootPageId) {
+ this.bag = bag;
+ this.actOnEachElement = actOnEachElement;
+ this.rootLevel = rootLevel;
+
+ childrenPageIds = new long[rootLevel + 1][];
+ currentChildIndices = new int[rootLevel + 1];
+
+ currentLevel = rootLevel;
+ currentPageId = rootPageId;
+ }
+
+ @Override
+ public void runStep() throws Exception {
+ destroyNextBatch();
+
+ if (finished) {
+ addForRecycle(bag);
+ }
+ }
+
+ private void destroyNextBatch() throws IgniteInternalCheckedException {
+ int workDone = 0;
+
+ while (!finished && workDone < WORK_UNITS_PER_STEP) {
+ long pageId = currentPageId;
+
+ long page = acquirePage(pageId);
+
+ try {
+ long pageAddr = writeLock(pageId, page);
+
+ if (pageAddr == 0L) {
+ // This page was possibly recycled, but we still need to destroy the rest of the tree.
+ workDone++;
+
+ positionToNextPageId();
+
+ continue;
+ }
+
+ try {
+ BplusIo<L> io = io(pageAddr);
+
+ if (io.isLeaf() != (currentLevel == 0)) {
+ // Leaf pages only at the level 0.
+ fail("Leaf level mismatch: " + currentLevel);
+ }
+
+ int cnt = io.getCount(pageAddr);
+
+ if (cnt < 0) {
+ fail("Negative count: " + cnt);
+ }
+
+ if (!io.isLeaf()) {
+ readChildrenPageIdsAndDescend(pageAddr, io, cnt);
+ } else {
+ if (actOnEachElement != null) {
+ io.visit(pageAddr, actOnEachElement);
+
+ workDone += io.getCount(pageAddr);
+ }
+
+ positionToNextPageId();
+ }
+
+ bag.addFreePage(recyclePage(pageId, pageAddr));
+
+ } finally {
+ writeUnlock(pageId, page, pageAddr, true);
+ }
+ } finally {
+ releasePage(pageId, page);
+ }
+
+ workDone++;
+
+ if (bag.size() >= 128) {
+ addForRecycle(bag);
+ }
+ }
+ }
+
+ private void readChildrenPageIdsAndDescend(long pageAddr, BplusIo<L> io, int cnt) {
+ long[] pageIds = new long[cnt + 1];
+
+ // When i == cnt it is the same as io.getRight(cnt - 1) but works for routing pages.
+ for (int i = 0; i <= cnt; i++) {
+ long leftId = inner(io).getLeft(pageAddr, i, partId);
+
+ inner(io).setLeft(pageAddr, i, 0);
+
+ pageIds[i] = leftId;
+ }
+
+ currentLevel--;
+ childrenPageIds[currentLevel] = pageIds;
+ currentChildIndices[currentLevel] = 0;
+ currentPageId = childrenPageIds[currentLevel][currentChildIndices[currentLevel]];
+ }
+
+ /**
+ * Either positions {@link #currentPageId} to next ID that should be processed, or set {@link #finished} to {@code true}.
+ */
+ private void positionToNextPageId() {
+ while (currentLevel < rootLevel) {
+ if (currentChildIndices[currentLevel] + 1 < childrenPageIds[currentLevel].length) {
+ // We can go right.
+ currentChildIndices[currentLevel]++;
+ currentPageId = childrenPageIds[currentLevel][currentChildIndices[currentLevel]];
+
+ return;
+ } else {
+ // Go up and try going right again.
+ currentLevel++;
+ }
+ }
+
+ // We were not able to find any more tree nodes, so our work is over.
+ finished = true;
+ }
+
+ @Override
+ public boolean isCompleted() {
+ return finished;
+ }
+ }
}
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/CompletedGradualTask.java
similarity index 51%
copy from modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/CompletedGradualTask.java
index c04aa304c2..7b35da5963 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/CompletedGradualTask.java
@@ -15,24 +15,21 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.pagememory.mv;
+package org.apache.ignite.internal.pagememory.util;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.storage.engine.StorageEngine;
-import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
-
-class VolatilePageMemoryMvPartitionStorageTest extends AbstractPageMemoryMvPartitionStorageTest {
- @InjectConfiguration
- private VolatilePageMemoryStorageEngineConfiguration engineConfig;
+/**
+ * A {@link GradualTask} implementation that is completed from the start.
+ */
+class CompletedGradualTask implements GradualTask {
+ static final GradualTask INSTANCE = new CompletedGradualTask();
@Override
- protected StorageEngine createEngine() {
- return new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
+ public void runStep() {
+ // No-op.
}
@Override
- int pageSize() {
- return engineConfig.pageSize().value();
+ public boolean isCompleted() {
+ return true;
}
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/GradualTask.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/GradualTask.java
new file mode 100644
index 0000000000..073c04c278
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/GradualTask.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.util;
+
+/**
+ * A possibly long-running task that is separated to steps each of which does not take too much time to execute.
+ * Such tasks are to be executed by a {@link GradualTaskExecutor} so that, even if there are more tasks running
+ * at the same time than there are threads in the underlying thread pool, all tasks will have progress due to their
+ * cooperative execution model (execution of their steps will be interleaved).
+ *
+ * <p>The task is stateful, it must track its progress itself.
+ *
+ * <p>The task might be considered a <a href="https://en.wikipedia.org/wiki/Continuation">continuation</a>.
+ *
+ * @see GradualTaskExecutor
+ */
+public interface GradualTask {
+ /**
+ * Returns an already completed task.
+ */
+ static GradualTask completed() {
+ return CompletedGradualTask.INSTANCE;
+ }
+
+ /**
+ * Runs next task step. This should not take too long to let other gradual tasks in the same executor pass forward.
+ *
+ * @throws Exception If something goes wrong.
+ */
+ void runStep() throws Exception;
+
+ /**
+ * Returns {@code true} if the task is completed (so no steps should be run), or {@code false} if there are more
+ * steps to execute.
+ */
+ boolean isCompleted();
+
+ /**
+ * Executes some cleanup that must be done after the task finishes on any reason (all work done or exception thrown);
+ * it is analogous to {@code finally} block in Java.
+ */
+ default void cleanup() {
+ // No-op.
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/GradualTaskExecutor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/GradualTaskExecutor.java
new file mode 100644
index 0000000000..0b74c3987a
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/GradualTaskExecutor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.util;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Executor for {@link GradualTask}s. See GradualTask documentation for details.
+ *
+ * <p>This executor owns an executor service that it is passed, so it shuts it down when being closed.
+ *
+ * @see GradualTask
+ */
+public class GradualTaskExecutor implements ManuallyCloseable {
+ private final ExecutorService executor;
+
+ private final InFlightFutures inFlightFutures = new InFlightFutures();
+
+ private volatile boolean cancelled = false;
+
+ public GradualTaskExecutor(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * Starts execution of a {@link GradualTask} and returns a future that completes when the task completes.
+ *
+ * @param task Task to execute.
+ * @return Future that completes when the task completes.
+ */
+ public CompletableFuture<Void> execute(GradualTask task) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ inFlightFutures.registerFuture(future);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (cancelled) {
+ future.completeExceptionally(new CancellationException("The executor has been closed"));
+
+ return;
+ }
+
+ if (task.isCompleted()) {
+ future.complete(null);
+
+ return;
+ }
+
+ task.runStep();
+
+ if (task.isCompleted()) {
+ future.complete(null);
+ } else if (cancelled) {
+ future.completeExceptionally(new CancellationException("The executor has been closed"));
+ } else {
+ executor.execute(this);
+ }
+ } catch (Error e) {
+ future.completeExceptionally(e);
+
+ throw e;
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ }
+ };
+
+ executor.execute(runnable);
+
+ return future.whenComplete((res, ex) -> task.cleanup());
+ }
+
+ @Override
+ public void close() {
+ cancelled = true;
+
+ IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+
+ inFlightFutures.cancelInFlightFutures();
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/util/GradualTaskExecutorTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/util/GradualTaskExecutorTest.java
new file mode 100644
index 0000000000..e18ff3ee40
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/util/GradualTaskExecutorTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.util;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class GradualTaskExecutorTest {
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ private final GradualTaskExecutor executor = new GradualTaskExecutor(executorService);
+
+ @Mock
+ private GradualTask task;
+
+ @AfterEach
+ void stopExecutor() {
+ executor.close();
+ }
+
+ @Test
+ void executesStepsTillTaskCompletes() {
+ TwoStageTask twoStageTask = new TwoStageTask();
+
+ CompletableFuture<Void> future = executor.execute(twoStageTask);
+
+ assertThat(future, willCompleteSuccessfully());
+
+ assertTrue(twoStageTask.isCompleted());
+ }
+
+ @Test
+ void doesNotExecuteStepsWhenTaskIsCompleted() throws Exception {
+ when(task.isCompleted()).thenReturn(true);
+
+ CompletableFuture<Void> future = executor.execute(task);
+
+ assertThat(future, willCompleteSuccessfully());
+
+ verify(task, never()).runStep();
+ }
+
+ @Test
+ void veryLongTaskAllowsOtherTasksToRun() {
+ GradualTask infiniteTask = mock(GradualTask.class);
+ when(infiniteTask.isCompleted()).thenReturn(false);
+
+ executor.execute(infiniteTask);
+
+ TwoStageTask twoStageTask = new TwoStageTask();
+
+ CompletableFuture<Void> future = executor.execute(twoStageTask);
+
+ assertThat(future, willCompleteSuccessfully());
+
+ assertTrue(twoStageTask.isCompleted());
+ }
+
+ @Test
+ void nonFinishedTasksAreCancelledWhenExecutorIsClosed() {
+ GradualTask infiniteTask = mock(GradualTask.class);
+ when(infiniteTask.isCompleted()).thenReturn(false);
+
+ CompletableFuture<Void> future = executor.execute(infiniteTask);
+
+ assertFalse(future.isDone());
+
+ executor.close();
+
+ Exception ex = assertThrows(Exception.class, () -> future.getNow(null));
+
+ assertTrue(
+ hasCause(ex, CancellationException.class, null) || hasCause(ex, RejectedExecutionException.class, null),
+ "Unexpected exception thrown: " + ExceptionUtils.getFullStackTrace(ex)
+ );
+ }
+
+ private static class TwoStageTask implements GradualTask {
+ private final AtomicInteger stepsRun = new AtomicInteger(0);
+
+ @Override
+ public void runStep() {
+ stepsRun.incrementAndGet();
+ }
+
+ @Override
+ public boolean isCompleted() {
+ return stepsRun.get() >= 2;
+ }
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java
index 627ea03347..8e41c02c3b 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java
@@ -38,6 +38,10 @@ public final class RowId implements Serializable, Comparable<RowId> {
return new RowId(partitionId, Long.MIN_VALUE, Long.MIN_VALUE);
}
+ public static RowId highestRowId(int partitionId) {
+ return new RowId(partitionId, Long.MAX_VALUE, Long.MAX_VALUE);
+ }
+
/**
* Create a row ID with the UUID value based on {@link UUID#randomUUID()}.
* Intended for tests only, because random UUIDs are very slow when it comes to frequent usages.
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 324b4dae42..d9caa63a73 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -129,7 +129,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
* Tests that partition data does not overlap.
*/
@Test
- void testPartitionIndependence() throws Exception {
+ void testPartitionIndependence() {
MvPartitionStorage partitionStorage0 = tableStorage.getOrCreateMvPartition(PARTITION_ID_0);
// Using a shifted ID value to test a multibyte scenario.
MvPartitionStorage partitionStorage1 = tableStorage.getOrCreateMvPartition(PARTITION_ID_1);
@@ -499,6 +499,55 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
assertThrows(StorageRebalanceException.class, () -> tableStorage.startRebalancePartition(PARTITION_ID));
}
+ @Test
+ public void testDestroyTableStorage() throws Exception {
+ MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+ HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+ SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+ RowId rowId = new RowId(PARTITION_ID);
+
+ BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+ IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), binaryRow, rowId);
+ IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId);
+
+ mvPartitionStorage.runConsistently(() -> {
+ mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+ hashIndexStorage.put(hashIndexRow);
+
+ sortedIndexStorage.put(sortedIndexRow);
+
+ return null;
+ });
+
+ Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+ PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+ Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(hashIndexRow.indexColumns());
+
+ Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(hashIndexRow.indexColumns());
+ Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
+
+ tableStorage.destroy().get(1, SECONDS);
+
+ checkStorageDestroyed(mvPartitionStorage);
+ checkStorageDestroyed(hashIndexStorage);
+ checkStorageDestroyed(sortedIndexStorage);
+
+ assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+ assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
+
+ assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
+
+ assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+ assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
+
+ // Let's check that nothing will happen if we try to destroy it again.
+ assertThat(tableStorage.destroy(), willCompleteSuccessfully());
+ }
+
private static void createTestIndexes(TablesConfiguration tablesConfig) {
List<IndexDefinition> indexDefinitions = List.of(
SchemaBuilders.sortedIndex(SORTED_INDEX_NAME)
@@ -544,7 +593,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
}
}
- private void checkStorageDestroyed(MvPartitionStorage storage) throws Exception {
+ private void checkStorageDestroyed(MvPartitionStorage storage) {
int partId = PARTITION_ID;
assertThrows(StorageClosedException.class, () -> storage.runConsistently(() -> null));
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index d543fa74c9..8d237c3645 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.storage.impl;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import java.util.Map;
@@ -230,7 +231,15 @@ public class TestMvTableStorage implements MvTableStorage {
@Override
public CompletableFuture<Void> destroy() {
- return completedFuture(null);
+ stop();
+
+ CompletableFuture[] futures = new CompletableFuture[tableCfg.partitions().value()];
+
+ for (int partitionId = 0; partitionId < futures.length; partitionId++) {
+ futures[partitionId] = destroyPartition(partitionId);
+ }
+
+ return allOf(futures);
}
@Override
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index e58a69fa9f..0c6759bb88 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.storage.pagememory;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -108,6 +110,45 @@ public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
}
}
+ @Override
+ public CompletableFuture<Void> destroy() {
+ started = false;
+
+ List<CompletableFuture<Void>> destroyFutures = new ArrayList<>();
+
+ for (int i = 0; i < mvPartitions.length(); i++) {
+ CompletableFuture<Void> destroyPartitionFuture = partitionIdDestroyFutureMap.get(i);
+
+ if (destroyPartitionFuture != null) {
+ destroyFutures.add(destroyPartitionFuture);
+ } else {
+ AbstractPageMemoryMvPartitionStorage partition = mvPartitions.getAndUpdate(i, p -> null);
+
+ if (partition != null) {
+ destroyFutures.add(destroyMvPartitionStorage(partition));
+ }
+ }
+ }
+
+ if (destroyFutures.isEmpty()) {
+ finishDestruction();
+
+ return completedFuture(null);
+ } else {
+ return CompletableFuture.allOf(destroyFutures.toArray(CompletableFuture[]::new))
+ .whenComplete((unused, throwable) -> {
+ if (throwable == null) {
+ finishDestruction();
+ }
+ });
+ }
+ }
+
+ /**
+ * Executes actions needed to finish destruction after the table storage has been stopped and all partitions destroyed.
+ */
+ protected abstract void finishDestruction();
+
/**
* Returns a new instance of {@link AbstractPageMemoryMvPartitionStorage}.
*
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 5209d25ef0..2162a7bc56 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -17,11 +17,8 @@
package org.apache.ignite.internal.storage.pagememory;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
-import java.util.ArrayList;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -98,39 +95,8 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
}
@Override
- public CompletableFuture<Void> destroy() {
- started = false;
-
- List<CompletableFuture<Void>> destroyFutures = new ArrayList<>();
-
- for (int i = 0; i < mvPartitions.length(); i++) {
- CompletableFuture<Void> destroyPartitionFuture = partitionIdDestroyFutureMap.get(i);
-
- if (destroyPartitionFuture != null) {
- destroyFutures.add(destroyPartitionFuture);
- } else {
- AbstractPageMemoryMvPartitionStorage partition = mvPartitions.getAndUpdate(i, p -> null);
-
- if (partition != null) {
- destroyFutures.add(destroyMvPartitionStorage(partition));
- }
- }
- }
-
- int tableId = tableCfg.tableId().value();
-
- if (destroyFutures.isEmpty()) {
- dataRegion.pageMemory().onGroupDestroyed(tableId);
-
- return completedFuture(null);
- } else {
- return CompletableFuture.allOf(destroyFutures.toArray(CompletableFuture[]::new))
- .whenComplete((unused, throwable) -> {
- if (throwable == null) {
- dataRegion.pageMemory().onGroupDestroyed(tableId);
- }
- });
- }
+ protected void finishDestruction() {
+ dataRegion.pageMemory().onGroupDestroyed(tableCfg.tableId().value());
}
@Override
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
index 8282a82e28..296ed412f7 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
@@ -18,12 +18,12 @@
package org.apache.ignite.internal.storage.pagememory;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
-import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMemoryDataRegionConfiguration;
-import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker;
import org.apache.ignite.internal.pagememory.inmemory.VolatilePageMemory;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
@@ -48,6 +48,8 @@ public class VolatilePageMemoryDataRegion implements DataRegion<VolatilePageMemo
private final int pageSize;
+ private final PageEvictionTracker pageEvictionTracker;
+
private volatile VolatilePageMemory pageMemory;
private volatile RowVersionFreeList rowVersionFreeList;
@@ -60,16 +62,18 @@ public class VolatilePageMemoryDataRegion implements DataRegion<VolatilePageMemo
* @param cfg Data region configuration.
* @param ioRegistry IO registry.
* @param pageSize Page size in bytes.
+ * @param pageEvictionTracker Eviction tracker to use.
*/
public VolatilePageMemoryDataRegion(
VolatilePageMemoryDataRegionConfiguration cfg,
PageIoRegistry ioRegistry,
// TODO: IGNITE-17017 Move to common config
- int pageSize
- ) {
+ int pageSize,
+ PageEvictionTracker pageEvictionTracker) {
this.cfg = cfg;
this.ioRegistry = ioRegistry;
this.pageSize = pageSize;
+ this.pageEvictionTracker = pageEvictionTracker;
}
/**
@@ -91,7 +95,7 @@ public class VolatilePageMemoryDataRegion implements DataRegion<VolatilePageMemo
this.pageMemory = pageMemory;
}
- private static RowVersionFreeList createRowVersionFreeList(
+ private RowVersionFreeList createRowVersionFreeList(
PageMemory pageMemory
) throws IgniteInternalCheckedException {
long metaPageId = pageMemory.allocatePage(FREE_LIST_GROUP_ID, FREE_LIST_PARTITION_ID, FLAG_AUX);
@@ -106,7 +110,7 @@ public class VolatilePageMemoryDataRegion implements DataRegion<VolatilePageMemo
true,
// Because in memory.
null,
- PageEvictionTrackerNoOp.INSTANCE,
+ pageEvictionTracker,
IoStatisticsHolderNoOp.INSTANCE
);
}
@@ -125,7 +129,7 @@ public class VolatilePageMemoryDataRegion implements DataRegion<VolatilePageMemo
true,
// Because in memory.
null,
- PageEvictionTrackerNoOp.INSTANCE,
+ pageEvictionTracker,
IoStatisticsHolderNoOp.INSTANCE
);
}
@@ -134,9 +138,10 @@ public class VolatilePageMemoryDataRegion implements DataRegion<VolatilePageMemo
* Starts the in-memory data region.
*/
public void stop() throws Exception {
- closeAll(
- pageMemory != null ? () -> pageMemory.stop(true) : null,
- rowVersionFreeList != null ? rowVersionFreeList::close : null
+ closeAllManually(
+ rowVersionFreeList,
+ indexColumnsFreeList,
+ pageMemory != null ? () -> pageMemory.stop(true) : null
);
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataStorageModule.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataStorageModule.java
index 07ab8ba6b6..145461efeb 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataStorageModule.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataStorageModule.java
@@ -23,6 +23,7 @@ import com.google.auto.service.AutoService;
import java.nio.file.Path;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.storage.DataStorageModule;
import org.apache.ignite.internal.storage.StorageException;
@@ -59,6 +60,6 @@ public class VolatilePageMemoryDataStorageModule implements DataStorageModule {
ioRegistry.loadFromServiceLoader();
- return new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
+ return new VolatilePageMemoryStorageEngine(igniteInstanceName, engineConfig, ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
}
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
index ac47663e0a..6963bb9c71 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
@@ -23,18 +23,26 @@ import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMemoryDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.util.GradualTaskExecutor;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageView;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
/**
* Storage engine implementation based on {@link PageMemory} for in-memory case.
@@ -43,24 +51,36 @@ public class VolatilePageMemoryStorageEngine implements StorageEngine {
/** Engine name. */
public static final String ENGINE_NAME = "aimem";
+ private static final IgniteLogger LOG = Loggers.forClass(VolatilePageMemoryStorageEngine.class);
+
+ private final String igniteInstanceName;
+
private final VolatilePageMemoryStorageEngineConfiguration engineConfig;
private final PageIoRegistry ioRegistry;
+ private final PageEvictionTracker pageEvictionTracker;
+
private final Map<String, VolatilePageMemoryDataRegion> regions = new ConcurrentHashMap<>();
+ private volatile GradualTaskExecutor destructionExecutor;
+
/**
* Constructor.
*
* @param engineConfig PageMemory storage engine configuration.
* @param ioRegistry IO registry.
+ * @param pageEvictionTracker Eviction tracker to use.
*/
public VolatilePageMemoryStorageEngine(
+ String igniteInstanceName,
VolatilePageMemoryStorageEngineConfiguration engineConfig,
- PageIoRegistry ioRegistry
- ) {
+ PageIoRegistry ioRegistry,
+ PageEvictionTracker pageEvictionTracker) {
+ this.igniteInstanceName = igniteInstanceName;
this.engineConfig = engineConfig;
this.ioRegistry = ioRegistry;
+ this.pageEvictionTracker = pageEvictionTracker;
}
@Override
@@ -83,11 +103,23 @@ public class VolatilePageMemoryStorageEngine implements StorageEngine {
return completedFuture(null);
}
});
+
+ ThreadPoolExecutor destructionThreadPool = new ThreadPoolExecutor(
+ 0,
+ Runtime.getRuntime().availableProcessors(),
+ 100,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ NamedThreadFactory.create(igniteInstanceName, "volatile-mv-partition-destruction", LOG)
+ );
+ destructionExecutor = new GradualTaskExecutor(destructionThreadPool);
}
/** {@inheritDoc} */
@Override
public void stop() throws StorageException {
+ destructionExecutor.close();
+
try {
closeAll(regions.values().stream().map(region -> region::stop));
} catch (Exception e) {
@@ -101,7 +133,7 @@ public class VolatilePageMemoryStorageEngine implements StorageEngine {
throws StorageException {
VolatilePageMemoryDataStorageView dataStorageView = (VolatilePageMemoryDataStorageView) tableCfg.dataStorage().value();
- return new VolatilePageMemoryTableStorage(tableCfg, tablesCfg, regions.get(dataStorageView.dataRegion()));
+ return new VolatilePageMemoryTableStorage(tableCfg, tablesCfg, regions.get(dataStorageView.dataRegion()), destructionExecutor);
}
/**
@@ -114,7 +146,12 @@ public class VolatilePageMemoryStorageEngine implements StorageEngine {
String name = dataRegionConfig.name().value();
- VolatilePageMemoryDataRegion dataRegion = new VolatilePageMemoryDataRegion(dataRegionConfig, ioRegistry, pageSize);
+ VolatilePageMemoryDataRegion dataRegion = new VolatilePageMemoryDataRegion(
+ dataRegionConfig,
+ ioRegistry,
+ pageSize,
+ pageEvictionTracker
+ );
dataRegion.start();
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index c4755903d8..6110f22cd4 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -17,12 +17,11 @@
package org.apache.ignite.internal.storage.pagememory;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.pagememory.util.GradualTaskExecutor;
import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TableView;
@@ -40,6 +39,8 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStorage {
private final VolatilePageMemoryDataRegion dataRegion;
+ private final GradualTaskExecutor destructionExecutor;
+
/**
* Constructor.
*
@@ -49,11 +50,13 @@ public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStora
public VolatilePageMemoryTableStorage(
TableConfiguration tableCfg,
TablesConfiguration tablesCfg,
- VolatilePageMemoryDataRegion dataRegion
+ VolatilePageMemoryDataRegion dataRegion,
+ GradualTaskExecutor destructionExecutor
) {
super(tableCfg, tablesCfg);
this.dataRegion = dataRegion;
+ this.destructionExecutor = destructionExecutor;
}
@Override
@@ -72,7 +75,8 @@ public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStora
tablesConfiguration,
partitionId,
versionChainTree,
- indexMetaTree
+ indexMetaTree,
+ destructionExecutor
);
}
@@ -104,14 +108,8 @@ public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStora
}
@Override
- public CompletableFuture<Void> destroy() {
- try {
- stop();
-
- return completedFuture(null);
- } catch (Throwable throwable) {
- return failedFuture(throwable);
- }
+ protected void finishDestruction() {
+ // No-op.
}
/**
@@ -121,7 +119,7 @@ public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStora
* @param tableView Table configuration.
* @throws StorageException If failed.
*/
- VersionChainTree createVersionChainTree(int partId, TableView tableView) throws StorageException {
+ private VersionChainTree createVersionChainTree(int partId, TableView tableView) throws StorageException {
int grpId = tableView.tableId();
try {
@@ -166,7 +164,14 @@ public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStora
@Override
CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
- // TODO: IGNITE-17833 Implement
- throw new UnsupportedOperationException();
+ mvPartitionStorage.close();
+
+ VolatilePageMemoryMvPartitionStorage volatilePartitionStorage = (VolatilePageMemoryMvPartitionStorage) mvPartitionStorage;
+
+ // We ignore the future returned by destroyStructures() on purpose: the destruction happens in the background,
+ // we don't care when it finishes.
+ volatilePartitionStorage.destroyStructures();
+
+ return CompletableFuture.completedFuture(null);
}
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
index 41b3e768e1..cc2e3e2c26 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
@@ -85,9 +85,9 @@ public class PageMemoryHashIndexStorage implements HashIndexStorage {
partitionId = hashIndexTree.partitionId();
- lowestRowId = new RowId(partitionId, Long.MIN_VALUE, Long.MIN_VALUE);
+ lowestRowId = RowId.lowestRowId(partitionId);
- highestRowId = new RowId(partitionId, Long.MAX_VALUE, Long.MAX_VALUE);
+ highestRowId = RowId.highestRowId(partitionId);
}
@Override
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index 9d9180f1ed..5c2c639e5f 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -93,9 +93,9 @@ public class PageMemorySortedIndexStorage implements SortedIndexStorage {
partitionId = sortedIndexTree.partitionId();
- lowestRowId = new RowId(partitionId, Long.MIN_VALUE, Long.MIN_VALUE);
+ lowestRowId = RowId.lowestRowId(partitionId);
- highestRowId = new RowId(partitionId, Long.MAX_VALUE, Long.MAX_VALUE);
+ highestRowId = RowId.highestRowId(partitionId);
}
@Override
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index c516e31ff4..7df72b562b 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -176,7 +176,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
} else {
assert indexCfgView == null;
- //TODO IGNITE-17626 Drop the index synchronously.
+ //TODO: IGNITE-17626 Drop the index synchronously.
}
}
} catch (Exception e) {
@@ -368,11 +368,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
}
- private RowVersion readRowVersion(long nextLink, Predicate<HybridTimestamp> loadValue) {
+ RowVersion readRowVersion(long rowVersionLink, Predicate<HybridTimestamp> loadValue) {
ReadRowVersion read = new ReadRowVersion(partitionId);
try {
- rowVersionDataPageReader.traverse(nextLink, read, loadValue);
+ rowVersionDataPageReader.traverse(rowVersionLink, read, loadValue);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Row version lookup failed", e);
}
@@ -708,7 +708,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
}
- private RowVersion insertCommittedRowVersion(BinaryRow row, HybridTimestamp commitTimestamp, long nextPartitionlessLink) {
+ private RowVersion insertCommittedRowVersion(@Nullable BinaryRow row, HybridTimestamp commitTimestamp, long nextPartitionlessLink) {
byte[] rowBytes = rowBytes(row);
RowVersion rowVersion = new RowVersion(partitionId, commitTimestamp, nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
@@ -742,7 +742,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
try {
- if (rowVersion.nextLink() == 0) {
+ if (!rowVersion.hasNextLink()) {
return null;
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 7e32ec1c38..2bc5eba659 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -322,6 +322,8 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
versionChainTree.close();
indexMetaTree.close();
+ blobStorage.close();
+
for (PageMemoryHashIndexStorage hashIndexStorage : hashIndexes.values()) {
hashIndexStorage.close();
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index d6648474e5..4eabeb9faf 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -18,7 +18,11 @@
package org.apache.ignite.internal.storage.pagememory.mv;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.util.GradualTaskExecutor;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
@@ -27,12 +31,18 @@ import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStor
import org.apache.ignite.internal.storage.pagememory.index.hash.PageMemoryHashIndexStorage;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.Nullable;
/**
* Implementation of {@link MvPartitionStorage} based on a {@link BplusTree} for in-memory case.
*/
public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPartitionStorage {
+ private static final Predicate<HybridTimestamp> NEVER_LOAD_VALUE = ts -> false;
+
+ private final GradualTaskExecutor destructionExecutor;
+
/** Last applied index value. */
private volatile long lastAppliedIndex;
@@ -50,13 +60,15 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
* @param partitionId Partition id.
* @param versionChainTree Table tree for {@link VersionChain}.
* @param indexMetaTree Tree that contains SQL indexes' metadata.
+ * @param destructionExecutor Executor used to destruct partitions.
*/
public VolatilePageMemoryMvPartitionStorage(
VolatilePageMemoryTableStorage tableStorage,
TablesConfiguration tablesCfg,
int partitionId,
VersionChainTree versionChainTree,
- IndexMetaTree indexMetaTree
+ IndexMetaTree indexMetaTree,
+ GradualTaskExecutor destructionExecutor
) {
super(
partitionId,
@@ -67,26 +79,60 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
indexMetaTree,
tablesCfg
);
+
+ this.destructionExecutor = destructionExecutor;
}
@Override
public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
- return closure.execute();
+ if (!closeBusyLock.enterBusy()) {
+ throwStorageClosedException();
+ }
+
+ try {
+ return closure.execute();
+ } finally {
+ closeBusyLock.leaveBusy();
+ }
}
@Override
public CompletableFuture<Void> flush() {
- return CompletableFuture.completedFuture(null);
+ if (!closeBusyLock.enterBusy()) {
+ throwStorageClosedException();
+ }
+
+ try {
+ return CompletableFuture.completedFuture(null);
+ } finally {
+ closeBusyLock.leaveBusy();
+ }
}
@Override
public long lastAppliedIndex() {
- return lastAppliedIndex;
+ if (!closeBusyLock.enterBusy()) {
+ throwStorageClosedException();
+ }
+
+ try {
+ return lastAppliedIndex;
+ } finally {
+ closeBusyLock.leaveBusy();
+ }
}
@Override
public long lastAppliedTerm() {
- return lastAppliedTerm;
+ if (!closeBusyLock.enterBusy()) {
+ throwStorageClosedException();
+ }
+
+ try {
+ return lastAppliedTerm;
+ } finally {
+ closeBusyLock.leaveBusy();
+ }
}
@Override
@@ -97,12 +143,28 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
@Override
public long persistedIndex() {
- return lastAppliedIndex;
+ if (!closeBusyLock.enterBusy()) {
+ throwStorageClosedException();
+ }
+
+ try {
+ return lastAppliedIndex;
+ } finally {
+ closeBusyLock.leaveBusy();
+ }
}
@Override
public @Nullable RaftGroupConfiguration committedGroupConfiguration() {
- return groupConfig;
+ if (!closeBusyLock.enterBusy()) {
+ throwStorageClosedException();
+ }
+
+ try {
+ return groupConfig;
+ } finally {
+ closeBusyLock.leaveBusy();
+ }
}
@Override
@@ -132,4 +194,41 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
hashIndexes.clear();
sortedIndexes.clear();
}
+
+ /**
+ * Destroys internal structures backing this partition.
+ *
+ * @return future that completes when the destruction completes.
+ */
+ public CompletableFuture<Void> destroyStructures() {
+ // TODO: IGNITE-18531 - destroy indices.
+
+ try {
+ return destructionExecutor.execute(
+ versionChainTree.startGradualDestruction(chainKey -> destroyVersionChain((VersionChain) chainKey), false)
+ );
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot destroy MV partition in group=" + groupId + ", partition=" + partitionId, e);
+ }
+ }
+
+ private void destroyVersionChain(VersionChain chainKey) {
+ try {
+ deleteRowVersionsFromFreeList(chainKey);
+ } catch (IgniteInternalCheckedException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+
+ private void deleteRowVersionsFromFreeList(VersionChain chain) throws IgniteInternalCheckedException {
+ long rowVersionLink = chain.headLink();
+
+ while (rowVersionLink != PageIdUtils.NULL_LINK) {
+ RowVersion rowVersion = readRowVersion(rowVersionLink, NEVER_LOAD_VALUE);
+
+ rowVersionFreeList.removeDataRowByLink(rowVersion.link());
+
+ rowVersionLink = rowVersion.nextLink();
+ }
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java
index 55e39c6b50..b0a98ddd3a 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java
@@ -17,10 +17,13 @@
package org.apache.ignite.internal.storage.pagememory.mv.io;
+import java.util.function.Consumer;
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
import org.apache.ignite.internal.pagememory.tree.io.BplusLeafIo;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChain;
import org.apache.ignite.internal.storage.pagememory.mv.VersionChainKey;
import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
@@ -60,6 +63,24 @@ public final class VersionChainLeafIo extends BplusLeafIo<VersionChainKey> imple
/** {@inheritDoc} */
@Override
public VersionChainKey getLookupRow(BplusTree<VersionChainKey, ?> tree, long pageAddr, int idx) {
- return getRow(pageAddr, idx, 0xFFFF);
+ return getRow(pageAddr, idx, getPartitionId(pageAddr));
+ }
+
+ @Override
+ public void visit(long pageAddr, Consumer<VersionChainKey> c) {
+ int partitionId = getPartitionId(pageAddr);
+
+ int count = getCount(pageAddr);
+
+ for (int i = 0; i < count; i++) {
+ VersionChain chain = getRow(pageAddr, i, partitionId);
+
+ c.accept(chain);
+ }
+ }
+
+ private static int getPartitionId(long pageAddr) {
+ long pageId = getPageId(pageAddr);
+ return PageIdUtils.partitionId(pageId);
}
}
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
index 1d37235ef3..d351034cfb 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
@@ -17,17 +17,32 @@
package org.apache.ignite.internal.storage.pagememory;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
/**
@@ -35,16 +50,18 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith(ConfigurationExtension.class)
public class VolatilePageMemoryMvTableStorageTest extends AbstractMvTableStorageTest {
+ private final PageEvictionTracker pageEvictionTracker = spy(PageEvictionTrackerNoOp.INSTANCE);
+
private VolatilePageMemoryStorageEngine engine;
- private MvTableStorage tableStorage;
+ private VolatilePageMemoryTableStorage tableStorage;
@BeforeEach
void setUp(
@InjectConfiguration
VolatilePageMemoryStorageEngineConfiguration engineConfig,
@InjectConfiguration(
- value = "mock.tables.foo{ partitions = 512, dataStorage.name = " + VolatilePageMemoryStorageEngine.ENGINE_NAME + "}"
+ "mock.tables.foo{ partitions = 512, dataStorage.name = " + VolatilePageMemoryStorageEngine.ENGINE_NAME + "}"
)
TablesConfiguration tablesConfig
) {
@@ -52,7 +69,7 @@ public class VolatilePageMemoryMvTableStorageTest extends AbstractMvTableStorage
ioRegistry.loadFromServiceLoader();
- engine = new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
+ engine = new VolatilePageMemoryStorageEngine("node", engineConfig, ioRegistry, pageEvictionTracker);
engine.start();
@@ -71,18 +88,6 @@ public class VolatilePageMemoryMvTableStorageTest extends AbstractMvTableStorage
);
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17833")
- @Override
- public void testDestroyPartition() throws Exception {
- super.testDestroyPartition();
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17833")
- @Override
- public void testReCreatePartition() throws Exception {
- super.testReCreatePartition();
- }
-
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18028")
@Override
public void testSuccessRebalance() throws Exception {
@@ -100,4 +105,49 @@ public class VolatilePageMemoryMvTableStorageTest extends AbstractMvTableStorage
public void testStartRebalanceForClosedPartition() {
super.testStartRebalanceForClosedPartition();
}
+
+ @Test
+ void partitionDestructionFreesPartitionPages() throws Exception {
+ MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(0);
+
+ insertOneRow(partitionStorage);
+
+ long emptyDataPagesBeforeDestroy = tableStorage.dataRegion().rowVersionFreeList().emptyDataPages();
+
+ assertThat(tableStorage.destroyPartition(0), willSucceedFast());
+
+ assertDestructionCompletes(emptyDataPagesBeforeDestroy);
+ }
+
+ private void assertDestructionCompletes(long emptyDataPagesBeforeDestroy) throws InterruptedException, IgniteInternalCheckedException {
+ assertTrue(waitForCondition(
+ () -> tableStorage.dataRegion().rowVersionFreeList().emptyDataPages() > emptyDataPagesBeforeDestroy,
+ 5_000
+ ));
+
+ verify(pageEvictionTracker, times(1)).forgetPage(anyLong());
+ }
+
+ private void insertOneRow(MvPartitionStorage partitionStorage) {
+ BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+ partitionStorage.runConsistently(() -> {
+ partitionStorage.addWriteCommitted(new RowId(PARTITION_ID), binaryRow, clock.now());
+
+ return null;
+ });
+ }
+
+ @Test
+ void tableStorageDestructionFreesPartitionsPages() throws Exception {
+ MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(0);
+
+ insertOneRow(partitionStorage);
+
+ long emptyDataPagesBeforeDestroy = tableStorage.dataRegion().rowVersionFreeList().emptyDataPages();
+
+ assertThat(tableStorage.destroy(), willSucceedFast());
+
+ assertDestructionCompletes(emptyDataPagesBeforeDestroy);
+ }
}
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java
index 5fc430f598..22fe4e8062 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.pagememory.index;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
@@ -51,7 +52,7 @@ class VolatilePageMemoryHashIndexStorageTest extends AbstractPageMemoryHashIndex
ioRegistry.loadFromServiceLoader();
- engine = new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
+ engine = new VolatilePageMemoryStorageEngine("node", engineConfig, ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
engine.start();
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemorySortedIndexStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemorySortedIndexStorageTest.java
index c31aa12556..c56180e277 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemorySortedIndexStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemorySortedIndexStorageTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.pagememory.index;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
@@ -51,8 +52,7 @@ class VolatilePageMemorySortedIndexStorageTest extends AbstractPageMemorySortedI
ioRegistry.loadFromServiceLoader();
- engine = new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
-
+ engine = new VolatilePageMemoryStorageEngine("node", engineConfig, ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
engine.start();
table = engine.createMvTable(tablesConfig.tables().get("foo"), tablesConfig);
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/BlobStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/BlobStorageTest.java
index 29759465d5..a1ee663501 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/BlobStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/BlobStorageTest.java
@@ -27,13 +27,11 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.pagememory.PageIdAllocator;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMemoryDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.inmemory.VolatilePageMemory;
@@ -153,14 +151,6 @@ class BlobStorageTest {
@Test
void freedPagesAreRecycled() throws Exception {
- List<Long> allocatedPageIds = new ArrayList<>();
-
- when(pageMemory.allocatePage(1, 1, PageIdAllocator.FLAG_AUX)).then(invocation -> {
- long pageId = (long) invocation.callRealMethod();
- allocatedPageIds.add(pageId);
- return pageId;
- });
-
long pageId = blobStorage.addBlob(new byte[PAGE_SIZE * 2]);
blobStorage.updateBlob(pageId, new byte[0]);
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
index fff0392187..22f2efcf32 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.storage.pagememory.mv;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.storage.AbstractMvPartitionStorageConcurrencyTest;
import org.apache.ignite.internal.storage.engine.StorageEngine;
@@ -39,6 +40,6 @@ class VolatilePageMemoryMvPartitionStorageConcurrencyTest extends AbstractMvPart
ioRegistry.loadFromServiceLoader();
- return new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
+ return new VolatilePageMemoryStorageEngine("node", engineConfig, ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
}
}
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
index c04aa304c2..748d42b395 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.storage.pagememory.mv;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
@@ -28,7 +29,7 @@ class VolatilePageMemoryMvPartitionStorageTest extends AbstractPageMemoryMvParti
@Override
protected StorageEngine createEngine() {
- return new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
+ return new VolatilePageMemoryStorageEngine("node", engineConfig, ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
}
@Override
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index e857c368a6..51ea832730 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -153,21 +153,31 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
assertThat(tableStorage.isVolatile(), is(false));
}
+ @Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
@Override
public void testSuccessRebalance() throws Exception {
super.testSuccessRebalance();
}
+ @Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
@Override
public void testFailRebalance() throws Exception {
super.testFailRebalance();
}
+ @Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
@Override
public void testStartRebalanceForClosedPartition() {
super.testStartRebalanceForClosedPartition();
}
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18523")
+ @Override
+ public void testDestroyTableStorage() throws Exception {
+ super.testDestroyTableStorage();
+ }
}