You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/01/29 09:26:02 UTC
[ignite] branch master updated: IGNITE-12496 Correctly handle
checkpoint read lock during index drop - Fixes #7208.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a1b939a4 IGNITE-12496 Correctly handle checkpoint read lock during index drop - Fixes #7208.
a1b939a4 is described below
commit a1b939a40bfa9be36aa94bb37696dcb0488c9ec4
Author: denis-chudov <dc...@gridgain.com>
AuthorDate: Wed Jan 29 12:06:19 2020 +0300
IGNITE-12496 Correctly handle checkpoint read lock during index drop - Fixes #7208.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../apache/ignite/internal/GridKernalContext.java | 6 +
.../ignite/internal/GridKernalContextImpl.java | 12 +
.../org/apache/ignite/internal/IgniteKernal.java | 2 +
.../cache/IgniteCacheOffheapManagerImpl.java | 2 +-
.../persistence/file/FilePageStoreManager.java | 13 +-
.../pendingtask/DurableBackgroundTask.java | 41 ++
.../cache/persistence/tree/BPlusTree.java | 262 ++++++--
.../wal/reader/StandaloneGridKernalContext.java | 6 +
.../cluster/GridClusterStateProcessor.java | 2 +
.../localtask/DurableBackgroundTasksProcessor.java | 265 ++++++++
.../apache/ignite/internal/util/IgniteUtils.java | 30 +
.../ignite/testframework/ListeningTestLogger.java | 12 +
.../h2/DurableBackgroundCleanupIndexTreeTask.java | 179 ++++++
.../internal/processors/query/h2/H2Utils.java | 3 +
.../processors/query/h2/database/H2Tree.java | 16 +-
.../query/h2/database/H2TreeClientIndex.java | 2 +-
.../processors/query/h2/database/H2TreeIndex.java | 125 +++-
.../query/h2/database/H2TreeIndexBase.java | 16 +-
.../processors/query/h2/opt/GridH2IndexBase.java | 9 +
.../processors/query/h2/opt/GridH2Table.java | 7 +-
.../db/LongDestroyDurableBackgroundTaskTest.java | 676 +++++++++++++++++++++
.../testsuites/IgnitePdsWithIndexingTestSuite.java | 4 +-
22 files changed, 1607 insertions(+), 83 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index bc027e4..5be2327 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
import org.apache.ignite.internal.processors.igfs.IgfsHelper;
import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
+import org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessor;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
@@ -770,4 +771,9 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* @return {@code True} if node is in recovery mode (before join to topology).
*/
public boolean recoveryMode();
+
+ /**
+ * @return Local continuous tasks processor.
+ */
+ public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 4ee00cb..4bd3aa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
import org.apache.ignite.internal.processors.igfs.IgfsHelper;
import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
+import org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessor;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
@@ -419,6 +420,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
private LongJVMPauseDetector pauseDetector;
/** */
+ @GridToStringExclude
+ private DurableBackgroundTasksProcessor durableBackgroundTasksProcessor;
+
+ /** */
private Thread.UncaughtExceptionHandler hnd;
/** */
@@ -702,6 +707,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
compressProc = (CompressionProcessor)comp;
else if (comp instanceof DiagnosticProcessor)
diagnosticProcessor = (DiagnosticProcessor)comp;
+ else if (comp instanceof DurableBackgroundTasksProcessor)
+ durableBackgroundTasksProcessor = (DurableBackgroundTasksProcessor)comp;
else if (!(comp instanceof DiscoveryNodeValidationProcessor
|| comp instanceof PlatformPluginProcessor))
assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();
@@ -1305,4 +1312,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@Override public String toString() {
return S.toString(GridKernalContextImpl.class, this);
}
+
+ /** {@inheritDoc} */
+ @Override public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor() {
+ return durableBackgroundTasksProcessor;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 3c4698d..1840451 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -159,6 +159,7 @@ import org.apache.ignite.internal.processors.hadoop.Hadoop;
import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
+import org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
@@ -1257,6 +1258,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(createComponent(PlatformProcessor.class, ctx));
startProcessor(new DistributedMetaStorageImpl(ctx));
startProcessor(new DistributedConfigurationProcessor(ctx));
+ startProcessor(new DurableBackgroundTasksProcessor(ctx));
startTimer.finishGlobalStage("Start processors");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index c5a42bd..44b15ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -2940,7 +2940,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
ex.addSuppressed(e);
}
}
- });
+ }, false);
if (exception.get() != null)
throw new IgniteCheckedException("Failed to destroy store", exception.get());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 83595ab..2f95335 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -89,7 +89,6 @@ import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static java.lang.String.format;
import static java.nio.file.Files.delete;
import static java.nio.file.Files.newDirectoryStream;
import static java.util.Objects.requireNonNull;
@@ -1339,17 +1338,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
* Cancels async tasks.
*/
public void awaitAsyncTaskCompletion(boolean cancel) {
- for (GridWorker worker : workers) {
- try {
- if (cancel)
- worker.cancel();
-
- worker.join();
- }
- catch (Exception e) {
- log.warning(format("Failed to cancel grid runnable [%s]: %s", worker.toString(), e.getMessage()));
- }
- }
+ U.awaitForWorkersStop(workers, cancel, log);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
new file mode 100644
index 0000000..a9e8d9e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.GridKernalContext;
+
+/**
+ * Durable task that should be used to do long operations (e.g. index deletion) in background
+ * for cases when node with persistence can fail before operation is completed. After start, node reads it's
+ * pending background tasks from metastorage and completes them.
+ */
+public interface DurableBackgroundTask extends Serializable {
+ /**
+ * Short unique name of the task is used to build metastorage key for saving this task and for logging.
+ *
+ * @return Short name of this task.
+ */
+ public String shortName();
+
+ /**
+ * Method that executes the task. It is called after node start.
+ *
+ * @param ctx Grid kernal context.
+ */
+ public void execute(GridKernalContext ctx);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index c98a7b0..1095243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Deque;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -68,6 +70,7 @@ import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridTreePrinter;
+import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -905,6 +908,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @throws IgniteCheckedException If failed.
*/
private TreeMetaData treeMeta() throws IgniteCheckedException {
+ return treeMeta(0L);
+ }
+
+ /**
+ * @param metaPageAddr Meta page address. If equals {@code 0}, it means that we should do read lock on
+ * meta page and get meta page address. Otherwise we will not do the lock and will use the given address.
+ * @return Tree meta data.
+ * @throws IgniteCheckedException If failed.
+ */
+ private TreeMetaData treeMeta(final long metaPageAddr) throws IgniteCheckedException {
TreeMetaData meta0 = treeMeta;
if (meta0 != null)
@@ -912,10 +925,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
final long metaPage = acquirePage(metaPageId);
try {
- long pageAddr = readLock(metaPageId, metaPage); // Meta can't be removed.
+ long pageAddr;
- assert pageAddr != 0 : "Failed to read lock meta page [metaPageId=" +
- U.hexLong(metaPageId) + ']';
+ if (metaPageAddr == 0L) {
+ pageAddr = readLock(metaPageId, metaPage);
+
+ assert pageAddr != 0 : "Failed to read lock meta page [metaPageId=" +
+ U.hexLong(metaPageId) + ']';
+ }
+ else
+ pageAddr = metaPageAddr;
try {
BPlusMetaIO io = BPlusMetaIO.VERSIONS.forPage(pageAddr);
@@ -926,7 +945,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
treeMeta = meta0 = new TreeMetaData(rootLvl, rootId);
}
finally {
- readUnlock(metaPageId, metaPage, pageAddr);
+ if (metaPageAddr == 0L)
+ readUnlock(metaPageId, metaPage, pageAddr);
}
}
finally {
@@ -941,7 +961,17 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @throws IgniteCheckedException If failed.
*/
private int getRootLevel() throws IgniteCheckedException {
- TreeMetaData meta0 = treeMeta();
+ return getRootLevel(0L);
+ }
+
+ /**
+ * @param metaPageAddr Meta page address. If equals {@code 0}, it means that we should do read lock on
+ * meta page and get meta page address. Otherwise we will not do the lock and will use the given address.
+ * @return Root level.
+ * @throws IgniteCheckedException If failed.
+ */
+ private int getRootLevel(long metaPageAddr) throws IgniteCheckedException {
+ TreeMetaData meta0 = treeMeta(metaPageAddr);
assert meta0 != null;
@@ -955,7 +985,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @return Page ID.
*/
private long getFirstPageId(long metaId, long metaPage, int lvl) {
- long pageAddr = readLock(metaId, metaPage); // Meta can't be removed.
+ return getFirstPageId(metaId, metaPage, lvl, 0L);
+ }
+
+ /**
+ * @param metaId Meta page ID.
+ * @param metaPage Meta page pointer.
+ * @param lvl Level, if {@code 0} then it is a bottom level, if negative then root.
+ * @param metaPageAddr Meta page address. If equals {@code 0}, it means that we should do read lock on
+ * meta page and get meta page address. Otherwise we will not do the lock and will use the given address.
+ * @return Page ID.
+ */
+ private long getFirstPageId(long metaId, long metaPage, int lvl, final long metaPageAddr) {
+ long pageAddr = metaPageAddr != 0L ? metaPageAddr : readLock(metaId, metaPage); // Meta can't be removed.
try {
BPlusMetaIO io = BPlusMetaIO.VERSIONS.forPage(pageAddr);
@@ -969,7 +1011,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
return io.getFirstPageId(pageAddr, lvl);
}
finally {
- readUnlock(metaId, metaPage, pageAddr);
+ if (metaPageAddr == 0L)
+ readUnlock(metaId, metaPage, pageAddr);
}
}
@@ -2245,6 +2288,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
long cnt = 0;
long curPage = acquirePage(curPageId);
+
try {
long curPageAddr = readLock(curPageId, curPage);
@@ -2399,6 +2443,39 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
}
/**
+ * Releases the lock that is held by long tree destroy process for a short period of time and acquires it again,
+ * allowing other processes to acquire it.
+ */
+ protected void temporaryReleaseLock() {
+ // No-op.
+ }
+
+ /**
+ * Releases the lock that is held by long tree destroy process for a short period of time and acquires it again,
+ * allowing other processes to acquire it.
+ * @param lockedPages Deque of locked pages. {@link GridTuple3} contains page id, page pointer and page address.
+ * Pages are ordered in that order as they were locked by destroy method. We unlock them in reverse order and
+ * unlock in direct order.
+ */
+ private void temporaryReleaseLock(Deque<GridTuple3<Long, Long, Long>> lockedPages) {
+ lockedPages.iterator().forEachRemaining(t -> writeUnlock(t.get1(), t.get2(), t.get3(), true));
+
+ temporaryReleaseLock();
+
+ lockedPages.descendingIterator().forEachRemaining(t -> writeLock(t.get1(), t.get2()));
+ }
+
+ /**
+ * Maximum time for which tree destroy process is allowed to hold the lock, after this time exceeds,
+ * {@link BPlusTree#temporaryReleaseLock()} is called and hold time is reset.
+ *
+ * @return Time, in milliseconds.
+ */
+ protected long maxLockHoldTime() {
+ return Long.MAX_VALUE;
+ }
+
+ /**
* Destroys tree. This method is allowed to be invoked only when the tree is out of use (no concurrent operations
* are trying to read or update the tree after destroy beginning).
*
@@ -2408,7 +2485,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @throws IgniteCheckedException If failed.
*/
public final long destroy() throws IgniteCheckedException {
- return destroy(null);
+ return destroy(null, false);
}
/**
@@ -2416,13 +2493,15 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* are trying to read or update the tree after destroy beginning).
*
* @param c Visitor closure. Visits only leaf pages.
+ * @param forceDestroy Whether to proceed with destroying, even if tree is already marked as destroyed (see
+ * {@link #markDestroyed()}).
* @return Number of pages recycled from this tree. If the tree was destroyed by someone else concurrently returns
* {@code 0}, otherwise it should return at least {@code 2} (for meta page and root page), unless this tree is
* used as metadata storage, or {@code -1} if we don't have a reuse list and did not do recycling at all.
* @throws IgniteCheckedException If failed.
*/
- public final long destroy(IgniteInClosure<L> c) throws IgniteCheckedException {
- if (!markDestroyed())
+ public final long destroy(IgniteInClosure<L> c, boolean forceDestroy) throws IgniteCheckedException {
+ if (!markDestroyed() && !forceDestroy)
return 0;
if (reuseList == null)
@@ -2432,68 +2511,150 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
long pagesCnt = 0;
+ AtomicLong lockHoldStartTime = new AtomicLong(U.currentTimeMillis());
+
+ Deque<GridTuple3<Long, Long, Long>> lockedPages = new LinkedList<>();
+
+ final long lockMaxTime = maxLockHoldTime();
+
long metaPage = acquirePage(metaPageId);
- try {
+
+ try {
long metaPageAddr = writeLock(metaPageId, metaPage); // No checks, we must be out of use.
- assert metaPageAddr != 0L;
+ lockedPages.push(new GridTuple3<>(metaPageId, metaPage, metaPageAddr));
try {
- for (long pageId : getFirstPageIds(metaPageAddr)) {
- assert pageId != 0;
+ assert metaPageAddr != 0L;
- do {
- final long pId = pageId;
+ int rootLvl = getRootLevel(metaPageAddr);
- long page = acquirePage(pId);
+ if (rootLvl < 0)
+ fail("Root level: " + rootLvl);
- try {
- long pageAddr = writeLock(pId, page); // No checks, we must be out of use.
+ long rootPageId = getFirstPageId(metaPageId, metaPage, rootLvl, metaPageAddr);
- try {
- BPlusIO<L> io = io(pageAddr);
+ pagesCnt += destroyDownPages(bag, rootPageId, rootLvl, c, lockHoldStartTime, lockMaxTime, lockedPages);
- if (c != null && io.isLeaf())
- io.visit(pageAddr, c);
+ bag.addFreePage(recyclePage(metaPageId, metaPage, metaPageAddr, null));
- long fwdPageId = io.getForward(pageAddr);
+ pagesCnt++;
+ }
+ finally {
+ writeUnlock(metaPageId, metaPage, metaPageAddr, true);
- bag.addFreePage(recyclePage(pageId, page, pageAddr, null));
- pagesCnt++;
+ lockedPages.pop();
+ }
+ }
+ finally {
+ releasePage(metaPageId, metaPage);
+ }
- pageId = fwdPageId;
- }
- finally {
- writeUnlock(pId, page, pageAddr, true);
- }
- }
- finally {
- releasePage(pId, page);
- }
+ reuseList.addForRecycle(bag);
- if (bag.size() == 128) {
- reuseList.addForRecycle(bag);
+ assert bag.isEmpty() : bag.size();
- assert bag.isEmpty() : bag.size();
- }
+ return pagesCnt;
+ }
+
+ /**
+ * Recursively destroys tree pages. Should be initially called with id of root page as {@code pageId}
+ * and root level as {@code lvl}.
+ *
+ * @param bag Reuse bag.
+ * @param pageId Page id.
+ * @param lvl Current level of tree.
+ * @param c Visitor closure. Visits only leaf pages.
+ * @param lockHoldStartTime When lock has been aquired last time.
+ * @param lockMaxTime Maximum time to hold the lock.
+ * @param lockedPages Deque of locked pages. Is used to release write-locked pages when temporary releasing
+ * checkpoint read lock.
+ * @return Count of destroyed pages.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected long destroyDownPages(
+ LongListReuseBag bag,
+ long pageId,
+ int lvl,
+ IgniteInClosure<L> c,
+ AtomicLong lockHoldStartTime,
+ long lockMaxTime,
+ Deque<GridTuple3<Long, Long, Long>> lockedPages
+ ) throws IgniteCheckedException {
+ if (pageId == 0)
+ return 0;
+
+ long pagesCnt = 0;
+
+ long page = acquirePage(pageId);
+
+ try {
+ long pageAddr = writeLock(pageId, page);
+
+ if (pageAddr == 0L)
+ return 0; // This page was possibly recycled, but we still need to destroy the rest of the tree.
+
+ lockedPages.push(new GridTuple3<>(pageId, page, pageAddr));
+
+ try {
+ BPlusIO<L> io = io(pageAddr);
+
+ if (io.isLeaf() != (lvl == 0)) // Leaf pages only at the level 0.
+ fail("Leaf level mismatch: " + lvl);
+
+ int cnt = io.getCount(pageAddr);
+
+ if (cnt < 0)
+ fail("Negative count: " + cnt);
+
+ if (!io.isLeaf()) {
+ // Recursively go down if we are on inner level.
+ // When i == cnt it is the same as io.getRight(cnt - 1) but works for routing pages.
+ for (int i = 0; i <= cnt; i++) {
+ long leftId = inner(io).getLeft(pageAddr, i);
+
+ inner(io).setLeft(pageAddr, i, 0);
+
+ pagesCnt += destroyDownPages(
+ bag,
+ leftId,
+ lvl - 1,
+ c,
+ lockHoldStartTime,
+ lockMaxTime,
+ lockedPages
+ );
}
- while (pageId != 0);
}
- bag.addFreePage(recyclePage(metaPageId, metaPage, metaPageAddr, null));
+ if (c != null && io.isLeaf())
+ io.visit(pageAddr, c);
+
+ bag.addFreePage(recyclePage(pageId, page, pageAddr, null));
+
pagesCnt++;
}
finally {
- writeUnlock(metaPageId, metaPage, metaPageAddr, true);
+ writeUnlock(pageId, page, pageAddr, true);
+
+ lockedPages.pop();
+ }
+
+ if (U.currentTimeMillis() - lockHoldStartTime.get() > lockMaxTime) {
+ temporaryReleaseLock(lockedPages);
+
+ lockHoldStartTime.set(U.currentTimeMillis());
}
}
finally {
- releasePage(metaPageId, metaPage);
+ releasePage(pageId, page);
}
- reuseList.addForRecycle(bag);
+ if (bag.size() == 128) {
+ reuseList.addForRecycle(bag);
- assert bag.isEmpty() : bag.size();
+ assert bag.isEmpty() : bag.size();
+ }
return pagesCnt;
}
@@ -2501,7 +2662,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
* @return {@code True} if state was changed.
*/
- private boolean markDestroyed() {
+ public boolean markDestroyed() {
return destroyed.compareAndSet(false, true);
}
@@ -5955,4 +6116,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
if (failureProcessor != null)
failureProcessor.process(new FailureContext(failureType, e));
}
+
+ /**
+ * Returns meta page id.
+ *
+ * @return Meta page id.
+ */
+ public long getMetaPageId() {
+ return metaPageId;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index e7e06be..e465545 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsHelper;
import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
+import org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
@@ -736,4 +737,9 @@ public class StandaloneGridKernalContext implements GridKernalContext {
@Override public DiagnosticProcessor diagnostic() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor() {
+ return null;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 1c7fb55..682c6dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -537,6 +537,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
ctx.cache().onStateChangeFinish(msg);
+ ctx.durableBackgroundTasksProcessor().onStateChangeFinish(msg);
+
if (readOnly(discoClusterState.lastState()) || readOnly(globalState.state()))
ctx.cache().context().readOnlyMode(readOnly(globalState.state()));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
new file mode 100644
index 0000000..05d82bf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.localtask;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+
+import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+
+/**
+ * Processor that is responsible for durable background tasks that are executed on local node
+ * and should be continued even after node restart.
+ */
+public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener {
+ /** Prefix for metastorage keys for durable background tasks. */
+ private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
+
+ /** Metastorage. */
+ private volatile ReadWriteMetastorage metastorage;
+
+ /** Metastorage synchronization mutex. */
+ private final Object metaStorageMux = new Object();
+
+ /** Set of workers that executing durable background tasks. */
+ private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<>();
+
+ /** Count of workers that executing durable background tasks. */
+ private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
+
+ /** Durable background tasks map. */
+ private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /**
+ * Starts the asynchronous operation of pending tasks execution. Is called on start.
+ */
+ private void asyncDurableBackgroundTasksExecution() {
+ assert durableBackgroundTasks != null;
+
+ for (DurableBackgroundTask task : durableBackgroundTasks.values())
+ asyncDurableBackgroundTaskExecute(task, false);
+ }
+
+ /**
+ * Creates a worker to execute single durable background task.
+ * @param task Task.
+ * @param dropTaskIfFailed Whether to delete task from metastorage, if it has failed.
+ */
+ private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task, boolean dropTaskIfFailed) {
+ String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
+
+ GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
+ @Override protected void body() {
+ try {
+ log.info("Executing durable background task: " + task.shortName());
+
+ task.execute(ctx);
+
+ log.info("Execution of durable background task completed: " + task.shortName());
+
+ removeDurableBackgroundTask(task);
+ }
+ catch (Throwable e) {
+ log.error("Could not execute durable background task: " + task.shortName(), e);
+
+ if (dropTaskIfFailed)
+ removeDurableBackgroundTask(task);
+ }
+ finally {
+ asyncDurableBackgroundTaskWorkers.remove(this);
+ }
+ }
+ };
+
+ asyncDurableBackgroundTaskWorkers.add(worker);
+
+ Thread asyncTask = new IgniteThread(worker);
+
+ asyncTask.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart(boolean active) {
+ asyncDurableBackgroundTasksExecution();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ // Waiting for workers, but not cancelling them, trying to complete running tasks.
+ awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, false, log);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
+ if (!msg.clusterActive())
+ awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+ synchronized (metaStorageMux) {
+ if (durableBackgroundTasks.isEmpty()) {
+ try {
+ metastorage.iterate(
+ STORE_DURABLE_BACKGROUND_TASK_PREFIX,
+ (key, val) -> durableBackgroundTasks.put(key, (DurableBackgroundTask)val),
+ true
+ );
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to iterate durable background tasks storage.", e);
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
+ synchronized (metaStorageMux) {
+ try {
+ for (Map.Entry<String, DurableBackgroundTask> entry : durableBackgroundTasks.entrySet()) {
+ if (metastorage.readRaw(entry.getKey()) == null)
+ metastorage.write(entry.getKey(), entry.getValue());
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to read key from durable background tasks storage.", e);
+ }
+ }
+
+ this.metastorage = metastorage;
+ }
+
+ /**
+ * Builds a metastorage key for durable background task object.
+ *
+ * @param obj Object.
+ * @return Metastorage key.
+ */
+ private String durableBackgroundTaskMetastorageKey(DurableBackgroundTask obj) {
+ String k = STORE_DURABLE_BACKGROUND_TASK_PREFIX + obj.shortName();
+
+ if (k.length() > MetastorageTree.MAX_KEY_LEN) {
+ int hashLenLimit = 5;
+
+ String hash = String.valueOf(k.hashCode());
+
+ k = k.substring(0, MetastorageTree.MAX_KEY_LEN - hashLenLimit) +
+ (hash.length() > hashLenLimit ? hash.substring(0, hashLenLimit) : hash);
+ }
+
+ return k;
+ }
+
+ /**
+ * Adds durable background task object.
+ *
+ * @param obj Object.
+ */
+ private void addDurableBackgroundTask(DurableBackgroundTask obj) {
+ String objName = durableBackgroundTaskMetastorageKey(obj);
+
+ synchronized (metaStorageMux) {
+ durableBackgroundTasks.put(objName, obj);
+
+ if (metastorage != null) {
+ ctx.cache().context().database().checkpointReadLock();
+
+ try {
+ metastorage.write(objName, obj);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ ctx.cache().context().database().checkpointReadUnlock();
+ }
+ }
+ }
+ }
+
+ /**
+ * Removes durable background task object.
+ *
+ * @param obj Object.
+ */
+ private void removeDurableBackgroundTask(DurableBackgroundTask obj) {
+ String objName = durableBackgroundTaskMetastorageKey(obj);
+
+ synchronized (metaStorageMux) {
+ durableBackgroundTasks.remove(objName);
+
+ if (metastorage != null) {
+ ctx.cache().context().database().checkpointReadLock();
+
+ try {
+ metastorage.remove(objName);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ ctx.cache().context().database().checkpointReadUnlock();
+ }
+ }
+ }
+ }
+
+ /**
+ * Starts durable background task. If task is applied to persistent cache, saves it to metastorage.
+ *
+ * @param task Continuous task.
+ * @param ccfg Cache configuration.
+ */
+ public void startDurableBackgroundTask(DurableBackgroundTask task, CacheConfiguration ccfg) {
+ if (CU.isPersistentCache(ccfg, ctx.config().getDataStorageConfiguration()))
+ addDurableBackgroundTask(task);
+
+ asyncDurableBackgroundTaskExecute(task, false);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index c24b637..0d12468 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -11669,4 +11669,34 @@ public abstract class IgniteUtils {
}
}
}
+
+ /**
+ * Stops workers from given collection and waits for their completion.
+ *
+ * @param workers Workers collection.
+ * @param cancel Wheter should cancel workers.
+ * @param log Logger.
+ */
+ public static void awaitForWorkersStop(Collection<GridWorker> workers, boolean cancel, IgniteLogger log) {
+ for (GridWorker worker : workers) {
+ try {
+ if (cancel)
+ worker.cancel();
+
+ worker.join();
+ }
+ catch (Exception e) {
+ log.warning(String.format("Failed to cancel grid runnable [%s]: %s", worker.toString(), e.getMessage()));
+ }
+ }
+ }
+
+ /**
+ * Unquote the given string.
+ * @param s String.
+ * @return Unquoted string.
+ */
+ public static String unquote(String s) {
+ return s == null ? null : s.replaceAll("^\"|\"$", "");
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/ListeningTestLogger.java b/modules/core/src/test/java/org/apache/ignite/testframework/ListeningTestLogger.java
index 54f3c4b..096ef1f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/ListeningTestLogger.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/ListeningTestLogger.java
@@ -69,6 +69,18 @@ public class ListeningTestLogger implements IgniteLogger {
}
/**
+ * @param dbg If set to {@code true}, enables debug and trace log messages processing.
+ * @param echo Logger to echo all messages, limited by {@code dbg} flag.
+ * @param lsnrs LogListeners to register instantly.
+ */
+ public ListeningTestLogger(boolean dbg, @Nullable IgniteLogger echo, @NotNull LogListener... lsnrs) {
+ this(dbg, echo);
+
+ for (LogListener lsnr : lsnrs)
+ registerListener(lsnr);
+ }
+
+ /**
* Registers message listener.
*
* @param lsnr Message listener.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DurableBackgroundCleanupIndexTreeTask.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DurableBackgroundCleanupIndexTreeTask.java
new file mode 100644
index 0000000..589e6e3
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DurableBackgroundCleanupIndexTreeTask.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.metric.IoStatisticsHolderIndex;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.h2.table.IndexColumn;
+
+import static org.apache.ignite.internal.metric.IoStatisticsType.SORTED_INDEX;
+
+/**
+ * Tasks that cleans up index tree.
+ */
+public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundTask {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private List<Long> rootPages;
+
+ /** */
+ private transient List<H2Tree> trees;
+
+ /** */
+ private String cacheGrpName;
+
+ /** */
+ private String cacheName;
+
+ /** */
+ private String schemaName;
+
+ /** */
+ private String idxName;
+
+ /** */
+ private String id;
+
+ /** */
+ public DurableBackgroundCleanupIndexTreeTask(
+ List<Long> rootPages,
+ List<H2Tree> trees,
+ String cacheGrpName,
+ String cacheName,
+ String schemaName,
+ String idxName
+ ) {
+ this.rootPages = rootPages;
+ this.trees = trees;
+ this.cacheGrpName = cacheGrpName;
+ this.cacheName = cacheName;
+ this.schemaName = schemaName;
+ this.idxName = idxName;
+ this.id = UUID.randomUUID().toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String shortName() {
+ return "DROP_SQL_INDEX-" + schemaName + "." + idxName + "-" + id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(GridKernalContext ctx) {
+ List<H2Tree> trees0 = trees;
+
+ if (trees0 == null) {
+ trees0 = new ArrayList<>(rootPages.size());
+
+ GridCacheContext cctx = ctx.cache().context().cacheContext(CU.cacheId(cacheName));
+
+ IoStatisticsHolderIndex stats = new IoStatisticsHolderIndex(
+ SORTED_INDEX,
+ cctx.name(),
+ idxName,
+ cctx.kernalContext().metric()
+ );
+
+ for (int i = 0; i < rootPages.size(); i++) {
+ Long rootPage = rootPages.get(i);
+
+ assert rootPage != null;
+
+ // Below we create a fake index tree using it's root page, stubbing some parameters,
+ // because we just going to free memory pages that are occupied by tree structure.
+ try {
+ String treeName = "deletedTree_" + i + "_" + shortName();
+
+ H2TreeIndex.IndexColumnsInfo unwrappedColsInfo =
+ new H2TreeIndex.IndexColumnsInfo(new IndexColumn[0], new ArrayList<>(), 0, 0);
+
+ H2TreeIndex.IndexColumnsInfo wrappedColsInfo =
+ new H2TreeIndex.IndexColumnsInfo(new IndexColumn[0], new ArrayList<>(), 0, 0);
+
+ H2Tree tree = new H2Tree(
+ cctx,
+ null,
+ treeName,
+ idxName,
+ cacheName,
+ null,
+ cctx.offheap().reuseListForIndex(treeName),
+ CU.cacheGroupId(cacheName, cacheGrpName),
+ cacheGrpName,
+ cctx.dataRegion().pageMemory(),
+ ctx.cache().context().wal(),
+ cctx.offheap().globalRemoveId(),
+ rootPage,
+ false,
+ unwrappedColsInfo,
+ wrappedColsInfo,
+ new AtomicInteger(0),
+ false,
+ false,
+ false,
+ null,
+ ctx.failure(),
+ null,
+ stats
+ );
+
+ trees0.add(tree);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ ctx.cache().context().database().checkpointReadLock();
+
+ try {
+ for (int i = 0; i < trees0.size(); i++) {
+ BPlusTree tree = trees0.get(i);
+
+ try {
+ tree.destroy(null, true);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+ finally {
+ ctx.cache().context().database().checkpointReadUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DurableBackgroundCleanupIndexTreeTask.class, this);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index 398a082..bbbf1cc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -119,6 +119,9 @@ public class H2Utils {
*/
static final int DECIMAL_DEFAULT_PRECISION = 65535;
+ /** */
+ public static final IndexColumn[] EMPTY_COLUMNS = new IndexColumn[0];
+
/**
* The default scale for a decimal value.
*/
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
index a2a3bd6..93fe93f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
@@ -149,7 +149,7 @@ public class H2Tree extends BPlusTree<H2Row, H2Row> {
* @param stats Statistics holder.
* @throws IgniteCheckedException If failed.
*/
- protected H2Tree(
+ public H2Tree(
GridCacheContext cctx,
GridH2Table table,
String name,
@@ -670,4 +670,18 @@ public class H2Tree extends BPlusTree<H2Row, H2Row> {
return e;
}
+
+ /** {@inheritDoc} */
+ @Override protected void temporaryReleaseLock() {
+ cctx.kernalContext().cache().context().database().checkpointReadUnlock();
+ cctx.kernalContext().cache().context().database().checkpointReadLock();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long maxLockHoldTime() {
+ long sysWorkerBlockedTimeout = cctx.kernalContext().workersRegistry().getSystemWorkerBlockedTimeout();
+
+ // Using timeout value reduced by 10 times to increase possibility of lock releasing before timeout.
+ return sysWorkerBlockedTimeout == 0 ? Long.MAX_VALUE : (sysWorkerBlockedTimeout / 10);
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
index 2f7a29a..3a043e5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
@@ -74,7 +74,7 @@ public class H2TreeClientIndex extends H2TreeIndexBase {
private int calculateInlineSize(IndexColumn[] cols, int inlineSize, CacheConfiguration<?, ?> cacheConf) {
List<InlineIndexHelper> inlineCols = getAvailableInlineColumns(cols);
- return computeInlineSize(inlineCols, inlineSize, cacheConf);
+ return computeInlineSize(inlineCols, inlineSize, cacheConf.getSqlIndexMaxInlineSize());
}
/** {@inheritDoc} */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index 2ff4472..01f4af1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -34,16 +35,23 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.metric.IoStatisticsHolderIndex;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.h2.H2Cursor;
import org.apache.ignite.internal.processors.query.h2.H2RowCache;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.DurableBackgroundCleanupIndexTreeTask;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -144,6 +152,9 @@ public class H2TreeIndex extends H2TreeIndexBase {
}
};
+ /** Override it for test purposes. */
+ public static H2TreeFactory h2TreeFactory = H2Tree::new;
+
/** Query context registry. */
private final QueryContextRegistry qryCtxRegistry;
@@ -203,9 +214,23 @@ public class H2TreeIndex extends H2TreeIndexBase {
treeName = BPlusTree.treeName((table.rowDescriptor() == null ? "" : typeId + "_") + idxName, "H2Tree");
- IndexColumnsInfo unwrappedColsInfo = new IndexColumnsInfo(unwrappedColsList, inlineSize);
+ IndexColumn[] unwrappedCols = unwrappedColsList.toArray(H2Utils.EMPTY_COLUMNS);
+
+ IndexColumnsInfo unwrappedColsInfo = new IndexColumnsInfo(
+ unwrappedCols,
+ getAvailableInlineColumns(unwrappedCols),
+ inlineSize,
+ cctx.config().getSqlIndexMaxInlineSize()
+ );
- IndexColumnsInfo wrappedColsInfo = new IndexColumnsInfo(wrappedColsList, inlineSize);
+ IndexColumn[] wrappedCols = wrappedColsList.toArray(H2Utils.EMPTY_COLUMNS);
+
+ IndexColumnsInfo wrappedColsInfo = new IndexColumnsInfo(
+ wrappedCols,
+ getAvailableInlineColumns(wrappedCols),
+ inlineSize,
+ cctx.config().getSqlIndexMaxInlineSize()
+ );
IndexColumn[] cols;
@@ -229,7 +254,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
try {
RootPage page = getMetaPage(i);
- segments[i] = new H2Tree(
+ segments[i] = h2TreeFactory.create(
cctx,
table,
treeName,
@@ -532,6 +557,47 @@ public class H2TreeIndex extends H2TreeIndexBase {
}
}
+ /** {@inheritDoc} */
+ @Override public void asyncDestroy(boolean rmvIdx) {
+ try {
+ if (cctx.affinityNode() && rmvIdx) {
+ assert cctx.shared().database().checkpointLockIsHeldByThread();
+
+ List<Long> rootPages = new ArrayList<>(segments.length);
+ List<H2Tree> trees = new ArrayList<>(segments.length);
+
+ for (int i = 0; i < segments.length; i++) {
+ H2Tree tree = segments[i];
+
+ tree.markDestroyed();
+
+ rootPages.add(tree.getMetaPageId());
+ trees.add(tree);
+
+ dropMetaPage(i);
+ }
+
+ DurableBackgroundTask task = new DurableBackgroundCleanupIndexTreeTask(
+ rootPages,
+ trees,
+ cctx.group().name(),
+ cctx.cache().name(),
+ table.getSchema().getName(),
+ idxName
+ );
+
+ cctx.kernalContext().durableBackgroundTasksProcessor().startDurableBackgroundTask(task, cctx.config());
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ if (msgLsnr != null)
+ ctx.io().removeMessageListener(msgTopic, msgLsnr);
+ }
+ }
+
/**
* @param segment Segment Id.
* @return Snapshot for requested segment if there is one.
@@ -895,7 +961,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
*
*/
@SuppressWarnings({"PublicInnerClass", "AssignmentOrReturnOfFieldWithMutableType"})
- public class IndexColumnsInfo {
+ public static class IndexColumnsInfo {
/** */
private final int inlineSize;
@@ -906,15 +972,23 @@ public class H2TreeIndex extends H2TreeIndexBase {
private final List<InlineIndexHelper> inlineIdx;
/**
- * @param colsList Index columns list
+ * @param cols Index columns.
+ * @param inlineIdxHelpers Inline helpers for index columns.
* @param cfgInlineSize Inline size from cache config.
+ * @param maxInlineSize Max inline size.
*/
@SuppressWarnings("ZeroLengthArrayAllocation")
- public IndexColumnsInfo(List<IndexColumn> colsList, int cfgInlineSize) {
- cols = colsList.toArray(new IndexColumn[0]);
+ public IndexColumnsInfo(
+ IndexColumn[] cols,
+ List<InlineIndexHelper> inlineIdxHelpers,
+ int cfgInlineSize,
+ int maxInlineSize
+ ) {
+ this.cols = cols;
- inlineIdx = getAvailableInlineColumns(cols);
- inlineSize = computeInlineSize(inlineIdx, cfgInlineSize, cctx.config());
+ inlineIdx = inlineIdxHelpers;
+
+ inlineSize = computeInlineSize(inlineIdx, cfgInlineSize, maxInlineSize);
}
/**
@@ -938,4 +1012,37 @@ public class H2TreeIndex extends H2TreeIndexBase {
return inlineIdx;
}
}
+
+ /**
+ * Interface for {@link H2Tree} factory class.
+ */
+ public interface H2TreeFactory {
+ /** */
+ public H2Tree create(
+ GridCacheContext cctx,
+ GridH2Table table,
+ String name,
+ String idxName,
+ String cacheName,
+ String tblName,
+ ReuseList reuseList,
+ int grpId,
+ String grpName,
+ PageMemory pageMem,
+ IgniteWriteAheadLogManager wal,
+ AtomicLong globalRmvId,
+ long metaPageId,
+ boolean initNew,
+ H2TreeIndex.IndexColumnsInfo unwrappedColsInfo,
+ H2TreeIndex.IndexColumnsInfo wrappedColsInfo,
+ AtomicInteger maxCalculatedInlineSize,
+ boolean pk,
+ boolean affinityKey,
+ boolean mvccEnabled,
+ @Nullable H2RowCache rowCache,
+ @Nullable FailureProcessor failureProcessor,
+ IgniteLogger log,
+ IoStatisticsHolder stats
+ ) throws IgniteCheckedException;
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndexBase.java
index 6a17fec..d08a256 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndexBase.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -103,16 +102,17 @@ public abstract class H2TreeIndexBase extends GridH2IndexBase {
/**
* @param inlineIdxs Inline index helpers.
* @param cfgInlineSize Inline size from cache config.
- * @param cacheConf Cache configuration.
+ * @param maxInlineSize Max inline size.
* @return Inline size.
*/
- protected int computeInlineSize(List<InlineIndexHelper> inlineIdxs, int cfgInlineSize,
- CacheConfiguration<?, ?> cacheConf) {
- int confSize = cacheConf.getSqlIndexMaxInlineSize();
-
- int propSize = confSize == -1
+ protected static int computeInlineSize(
+ List<InlineIndexHelper> inlineIdxs,
+ int cfgInlineSize,
+ int maxInlineSize
+ ) {
+ int propSize = maxInlineSize == -1
? IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_MAX_INDEX_PAYLOAD_SIZE, IGNITE_MAX_INDEX_PAYLOAD_SIZE_DEFAULT)
- : confSize;
+ : maxInlineSize;
if (cfgInlineSize == 0)
return 0;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 6f10ad7..45850c4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -64,6 +64,15 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
/**
+ * Attempts to asyncronously {@link #destroy} index and release all the resources.
+ *
+ * @param rmv Flag remove.
+ */
+ public void asyncDestroy(boolean rmv) {
+ // No-op.
+ }
+
+ /**
* @return Index segment ID for current query context.
*/
protected int threadLocalSegment() {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 77441e5..5768a92 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -31,13 +31,12 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
-import org.apache.ignite.cache.query.QueryRetryException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryRetryException;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -1101,7 +1100,7 @@ public class GridH2Table extends TableBase {
cctx0.shared().database().checkpointReadLock();
try {
- ((GridH2IndexBase)idx).destroy(rmIndex);
+ ((GridH2IndexBase)idx).asyncDestroy(rmIndex);
}
finally {
cctx0.shared().database().checkpointReadUnlock();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
new file mode 100644
index 0000000..1739a97
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
@@ -0,0 +1,676 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.LongListReuseBag;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.query.h2.H2RowCache;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
+import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesPartitionResult;
+import org.apache.ignite.internal.visor.verify.VisorValidateIndexesJobResult;
+import org.apache.ignite.internal.visor.verify.VisorValidateIndexesTask;
+import org.apache.ignite.internal.visor.verify.VisorValidateIndexesTaskArg;
+import org.apache.ignite.internal.visor.verify.VisorValidateIndexesTaskResult;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.testframework.CallbackExecutorLogListener;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.MessageOrderLogListener;
+import org.apache.ignite.testframework.junits.SystemPropertiesList;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT;
+
+/**
+ * Tests case when long index deletion operation happens.
+ */
+@SystemPropertiesList(
+ @WithSystemProperty(key = IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT, value = "5000")
+)
+public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest {
+ /** Nodes count. */
+ private static final int NODES_COUNT = 2;
+
+ /** Number of node that can be restarted during test, if test scenario requires it. */
+ private static final int RESTARTED_NODE_NUM = 0;
+
+ /** Number of node that is always alive during tests. */
+ private static final int ALWAYS_ALIVE_NODE_NUM = 1;
+
+ /** We imitate long index destroy in these tests, so this is delay for each page to destroy. */
+ private static final long TIME_FOR_EACH_INDEX_PAGE_TO_DESTROY = 300;
+
+ /** Index name. */
+ private static final String IDX_NAME = "T_IDX";
+
+ /** */
+ private final LogListener blockedSysCriticalThreadLsnr =
+ LogListener.matches("Blocked system-critical thread has been detected. This can lead to cluster-wide undefined behaviour [workerName=db-checkpoint-thread").build();
+
+ /** Latch that waits for execution of durable background task. */
+ private CountDownLatch pendingDelLatch;
+
+ /** Latch that waits for indexes rebuilding. */
+ private CountDownLatch idxsRebuildLatch;
+
+ /** */
+ private final LogListener pendingDelFinishedLsnr =
+ new CallbackExecutorLogListener(".*?Execution of durable background task completed.*", () -> pendingDelLatch.countDown());
+
+ /** */
+ private final LogListener idxsRebuildFinishedLsnr =
+ new CallbackExecutorLogListener("Indexes rebuilding completed for all caches.", () -> idxsRebuildLatch.countDown());
+
+ /** */
+ private final LogListener taskLifecycleListener =
+ new MessageOrderLogListener(
+ ".*?Executing durable background task: DROP_SQL_INDEX-PUBLIC." + IDX_NAME + "-.*",
+ ".*?Could not execute durable background task: DROP_SQL_INDEX-PUBLIC." + IDX_NAME + "-.*",
+ ".*?Executing durable background task: DROP_SQL_INDEX-PUBLIC." + IDX_NAME + "-.*",
+ ".*?Execution of durable background task completed: DROP_SQL_INDEX-PUBLIC." + IDX_NAME + "-.*"
+ );
+
+ /**
+ * When it is set to true during index deletion, node with number {@link #RESTARTED_NODE_NUM} fails to complete
+ * deletion.
+ */
+ private final AtomicBoolean blockDestroy = new AtomicBoolean(false);
+
+ /** */
+ private final ListeningTestLogger testLog = new ListeningTestLogger(
+ false,
+ log(),
+ blockedSysCriticalThreadLsnr,
+ pendingDelFinishedLsnr,
+ idxsRebuildFinishedLsnr,
+ taskLifecycleListener
+ );
+
+ /** */
+ private H2TreeIndex.H2TreeFactory regularH2TreeFactory;
+
+ /** */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setInitialSize(10 * 1024L * 1024L)
+ .setMaxSize(50 * 1024L * 1024L)
+ )
+ .setCheckpointFrequency(Long.MAX_VALUE / 2)
+ )
+ .setCacheConfiguration(
+ new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setBackups(1)
+ .setSqlSchema("PUBLIC")
+ )
+ .setGridLogger(testLog);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+
+ regularH2TreeFactory = H2TreeIndex.h2TreeFactory;
+
+ H2TreeIndex.h2TreeFactory = H2TreeTest::new;
+
+ blockedSysCriticalThreadLsnr.reset();
+
+ pendingDelLatch = new CountDownLatch(1);
+ idxsRebuildLatch = new CountDownLatch(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ blockedSysCriticalThreadLsnr.reset();
+
+ H2TreeIndex.h2TreeFactory = regularH2TreeFactory;
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ super.afterTest();
+ }
+
+ /**
+ * Tests case when long index deletion operation happens. Checkpoint should run in the middle of index deletion
+ * operation. Node can restart without fully deleted index tree.
+ *
+ * @param restart Whether do the restart of one node.
+ * @param rebalance Whether add to topology one more node while the index is being deleted.
+ * @param multicolumn Is index multicolumn.
+ * @param checkWhenOneNodeStopped Whether try to check index and try to recreate it while one node with pending
+ * task is stopped.
+ * @param dropIdxWhenOneNodeStopped Whether drop index on alive nodes while one node with pending
+ * task is stopped.
+ * @throws Exception If failed.
+ */
+ private void testLongIndexDeletion(
+ boolean restart,
+ boolean rebalance,
+ boolean multicolumn,
+ boolean checkWhenOneNodeStopped,
+ boolean dropIdxWhenOneNodeStopped
+ ) throws Exception {
+ // If not restart, then assume that index is always dropped.
+ boolean dropIdxWhenOneNodeStopped0 = !restart || dropIdxWhenOneNodeStopped;
+
+ int nodeCnt = NODES_COUNT;
+
+ Ignite ignite = prepareAndPopulateCluster(nodeCnt, multicolumn);
+
+ Ignite aliveNode = grid(ALWAYS_ALIVE_NODE_NUM);
+
+ IgniteCache<Integer, Integer> cacheOnAliveNode = aliveNode.cache(DEFAULT_CACHE_NAME);
+
+ if (rebalance) {
+ startGrid(nodeCnt);
+
+ nodeCnt++;
+
+ Collection<ClusterNode> blt = IntStream.range(0, nodeCnt)
+ .mapToObj(i -> grid(i).localNode())
+ .collect(toList());
+
+ ignite.cluster().setBaselineTopology(blt);
+ }
+
+ if (restart) {
+ blockDestroy.set(true);
+
+ stopGrid(RESTARTED_NODE_NUM, true);
+
+ awaitPartitionMapExchange();
+
+ checkSelectAndPlan(cacheOnAliveNode, false);
+
+ if (checkWhenOneNodeStopped) {
+ createIndex(cacheOnAliveNode, multicolumn);
+
+ checkSelectAndPlan(cacheOnAliveNode, true);
+
+ if (dropIdxWhenOneNodeStopped0)
+ query(cacheOnAliveNode, "drop index " + IDX_NAME);
+
+ forceCheckpoint(aliveNode);
+
+ aliveNode.cluster().active(false);
+ }
+
+ ignite = startGrid(RESTARTED_NODE_NUM);
+
+ awaitLatch(pendingDelLatch, "Test timed out: failed to await for durable background task completion.");
+
+ awaitPartitionMapExchange();
+
+ if (checkWhenOneNodeStopped) {
+ ignite.cluster().active(true);
+
+ // If index was dropped, we need to wait it's rebuild on restarted node.
+ if (!dropIdxWhenOneNodeStopped0)
+ awaitLatch(idxsRebuildLatch, "Failed to wait for indexes rebuilding.");
+ }
+
+ checkSelectAndPlan(cacheOnAliveNode, !dropIdxWhenOneNodeStopped0);
+ }
+ else
+ awaitLatch(pendingDelLatch, "Test timed out: failed to await for durable background task completion.");
+
+ IgniteCache<Integer, Integer> cache = grid(RESTARTED_NODE_NUM).cache(DEFAULT_CACHE_NAME);
+
+ checkSelectAndPlan(cache, !dropIdxWhenOneNodeStopped0);
+ checkSelectAndPlan(cacheOnAliveNode, !dropIdxWhenOneNodeStopped0);
+
+ // Trying to recreate index if it was dropped.
+ if (dropIdxWhenOneNodeStopped0)
+ createIndex(cache, multicolumn);
+
+ checkSelectAndPlan(cache, true);
+ checkSelectAndPlan(cacheOnAliveNode, true);
+
+ forceCheckpoint();
+
+ validateIndexes(ignite);
+
+ assertFalse(blockedSysCriticalThreadLsnr.check());
+ }
+
+ /**
+ * Awaits for latch for 60 seconds and fails, if latch was not counted down.
+ *
+ * @param latch Latch.
+ * @param failMsg Failure message.
+ * @throws InterruptedException If waiting failed.
+ */
+ private void awaitLatch(CountDownLatch latch, String failMsg) throws InterruptedException {
+ if (!latch.await(60, TimeUnit.SECONDS))
+ fail(failMsg);
+ }
+
+ /**
+ * Validates indexes on {@link #RESTARTED_NODE_NUM} and {@link #ALWAYS_ALIVE_NODE_NUM}.
+ *
+ * @param ignite Ignite instance.
+ */
+ private void validateIndexes(Ignite ignite) {
+ Set<UUID> nodeIds = new HashSet<UUID>() {{
+ add(grid(RESTARTED_NODE_NUM).cluster().localNode().id());
+ add(grid(ALWAYS_ALIVE_NODE_NUM).cluster().localNode().id());
+ }};
+
+ log.info("Doing indexes validation.");
+
+ VisorValidateIndexesTaskArg taskArg =
+ new VisorValidateIndexesTaskArg(Collections.singleton("SQL_PUBLIC_T"), nodeIds, 0, 1);
+
+ VisorValidateIndexesTaskResult taskRes =
+ ignite.compute().execute(VisorValidateIndexesTask.class.getName(), new VisorTaskArgument<>(nodeIds, taskArg, false));
+
+ if (!taskRes.exceptions().isEmpty()) {
+ for (Map.Entry<UUID, Exception> e : taskRes.exceptions().entrySet())
+ log.error("Exception while validation indexes on node id=" + e.getKey().toString(), e.getValue());
+ }
+
+ for (Map.Entry<UUID, VisorValidateIndexesJobResult> nodeEntry : taskRes.results().entrySet()) {
+ if (nodeEntry.getValue().hasIssues()) {
+ log.error("Validate indexes issues had been found on node id=" + nodeEntry.getKey().toString());
+
+ log.error("Integrity check failures: " + nodeEntry.getValue().integrityCheckFailures().size());
+
+ nodeEntry.getValue().integrityCheckFailures().forEach(f -> log.error(f.toString()));
+
+ logIssuesFromMap("Partition results", nodeEntry.getValue().partitionResult());
+
+ logIssuesFromMap("Index validation issues", nodeEntry.getValue().indexResult());
+ }
+ }
+
+ assertTrue(taskRes.exceptions().isEmpty());
+
+ for (VisorValidateIndexesJobResult res : taskRes.results().values())
+ assertFalse(res.hasIssues());
+ }
+
+ /**
+ * Logs index validation issues.
+ *
+ * @param caption Caption of log messages.
+ * @param map Map containing issues.
+ */
+ private void logIssuesFromMap(String caption, Map<?, ValidateIndexesPartitionResult> map) {
+ List<String> partResIssues = new LinkedList<>();
+
+ map.forEach((k, v) -> v.issues().forEach(vi -> partResIssues.add(k.toString() + ": " + vi.toString())));
+
+ log.error(caption + ": " + partResIssues.size());
+
+ partResIssues.forEach(r -> log.error(r));
+ }
+
+ /**
+ * Checks that select from table "t" is successful and correctness of index usage.
+ * Table should be already created.
+ *
+ * @param cache Cache.
+ * @param idxShouldExist Should index exist or not.
+ */
+ private void checkSelectAndPlan(IgniteCache<Integer, Integer> cache, boolean idxShouldExist) {
+ // Ensure that index is not used after it was dropped.
+ String plan = query(cache, "explain select id, p from t where p = 0")
+ .get(0).get(0).toString();
+
+ assertEquals(plan, idxShouldExist, plan.toUpperCase().contains(IDX_NAME));
+
+ // Trying to do a select.
+ String val = query(cache, "select p from t where p = 100").get(0).get(0).toString();
+
+ assertEquals("100", val);
+ }
+
+ /**
+ * Creates index on table "t", which should be already created.
+ *
+ * @param cache Cache.
+ * @param multicolumn Whether index is multicolumn.
+ */
+ private void createIndex(IgniteCache<Integer, Integer> cache, boolean multicolumn) {
+ query(cache, "create index " + IDX_NAME + " on t (p" + (multicolumn ? ", f)" : ")"));
+ }
+
+ /**
+ * Does single query.
+ *
+ * @param cache Cache.
+ * @param qry Query.
+ * @return Query result.
+ */
+ private List<List<?>> query(IgniteCache<Integer, Integer> cache, String qry) {
+ return cache.query(new SqlFieldsQuery(qry)).getAll();
+ }
+
+ /**
+ * Does parametrized query.
+ *
+ * @param cache Cache.
+ * @param qry Query.
+ * @param args Arguments.
+ * @return Query result.
+ */
+ private List<List<?>> query(IgniteCache<Integer, Integer> cache, String qry, Object... args) {
+ return cache.query(new SqlFieldsQuery(qry).setArgs(args)).getAll();
+ }
+
+ /**
+ * Starts cluster and populates with data.
+ *
+ * @param nodeCnt Nodes count.
+ * @param multicolumn Is index multicolumn.
+ * @return Ignite instance.
+ * @throws Exception If failed.
+ */
+ private IgniteEx prepareAndPopulateCluster(int nodeCnt, boolean multicolumn) throws Exception {
+ IgniteEx ignite = startGrids(nodeCnt);
+
+ ignite.cluster().active(true);
+
+ ignite.cluster().baselineAutoAdjustEnabled(false);
+
+ IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ query(cache, "create table t (id integer primary key, p integer, f integer, p integer) with \"BACKUPS=1\"");
+
+ createIndex(cache, multicolumn);
+
+ for (int i = 0; i < 5_000; i++)
+ query(cache, "insert into t (id, p, f) values (?, ?, ?)", i, i, i);
+
+ forceCheckpoint();
+
+ checkSelectAndPlan(cache, true);
+
+ final IgniteCache<Integer, Integer> finalCache = cache;
+
+ new Thread(() -> finalCache.query(new SqlFieldsQuery("drop index " + IDX_NAME)).getAll()).start();
+
+ // Waiting for some modified pages
+ doSleep(500);
+
+ // Now checkpoint will happen during index deletion before it completes.
+ forceCheckpoint();
+
+ return ignite;
+ }
+
+ /**
+ * Tests case when long index deletion operation happens. Checkpoint should run in the middle of index deletion
+ * operation.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testLongIndexDeletionSimple() throws Exception {
+ testLongIndexDeletion(false, false, false, false, true);
+ }
+
+ /**
+ * Tests case when long multicolumn index deletion operation happens. Checkpoint should run in the middle
+ * of index deletion operation.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testLongMulticolumnIndexDeletion() throws Exception {
+ testLongIndexDeletion(false, false, true, false, true);
+ }
+
+ /**
+ * Tests case when long index deletion operation happens. Checkpoint should run in the middle of index deletion
+ * operation. After checkpoint node should restart without fully deleted index tree.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testLongIndexDeletionWithRestart() throws Exception {
+ testLongIndexDeletion(true, false, false, false, true);
+ }
+
+ /**
+ * Tests case when long index deletion operation happens. Checkpoint should run in the middle of index deletion
+ * operation. After deletion start one more node should be included in topology.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testLongIndexDeletionWithRebalance() throws Exception {
+ testLongIndexDeletion(false, true, false, false, true);
+ }
+
+ /**
+ * Tests case when long index deletion operation happens. Checkpoint should run in the middle of index deletion
+ * operation. After checkpoint node should restart without fully deleted index tree. While node is stopped,
+ * we should check index and try to recreate it and do not drop again.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testLongIndexDeletionCheckWhenOneNodeStopped() throws Exception {
+ testLongIndexDeletion(true, false, false, true, false);
+ }
+
+ /**
+ * Tests case when long index deletion operation happens. Checkpoint should run in the middle of index deletion
+ * operation. After checkpoint node should restart without fully deleted index tree. While node is stopped,
+ * we should check index and try to recreate it and then drop again.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testLongIndexDeletionCheckWhenOneNodeStoppedAndDropIndex() throws Exception {
+ testLongIndexDeletion(true, false, false, true, true);
+ }
+
+ /**
+ * Tests that local task lifecycle was correct.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDestroyTaskLifecycle() throws Exception {
+ taskLifecycleListener.reset();
+
+ IgniteEx ignite = prepareAndPopulateCluster(1, false);
+
+ IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+ checkSelectAndPlan(cache, false);
+
+ ignite.cluster().active(false);
+
+ ignite.cluster().active(true);
+
+ ignite.cache(DEFAULT_CACHE_NAME).indexReadyFuture().get();
+
+ blockDestroy.set(true);
+
+ stopGrid(RESTARTED_NODE_NUM);
+
+ blockDestroy.set(false);
+
+ startGrid(RESTARTED_NODE_NUM);
+
+ awaitLatch(pendingDelLatch, "Test timed out: failed to await for durable background task completion.");
+
+ assertTrue(taskLifecycleListener.check());
+ }
+
+ /**
+ *
+ */
+ private class H2TreeTest extends H2Tree {
+ /**
+ * Constructor.
+ *
+ * @param cctx Cache context.
+ * @param table Owning table.
+ * @param name Tree name.
+ * @param idxName Name of index.
+ * @param cacheName Cache name.
+ * @param tblName Table name.
+ * @param reuseList Reuse list.
+ * @param grpId Cache group ID.
+ * @param grpName
+ * @param pageMem Page memory.
+ * @param wal Write ahead log manager.
+ * @param globalRmvId
+ * @param metaPageId Meta page ID.
+ * @param initNew Initialize new index.
+ * @param unwrappedColsInfo
+ * @param wrappedColsInfo
+ * @param maxCalculatedInlineSize
+ * @param pk {@code true} for primary key.
+ * @param affinityKey {@code true} for affinity key.
+ * @param mvccEnabled Mvcc flag.
+ * @param rowCache Row cache.
+ * @param failureProcessor if the tree is corrupted.
+ * @param log Logger.
+ * @param stats Statistics holder.
+ * @throws IgniteCheckedException If failed.
+ */
+ public H2TreeTest(
+ GridCacheContext cctx,
+ GridH2Table table,
+ String name,
+ String idxName,
+ String cacheName,
+ String tblName,
+ ReuseList reuseList,
+ int grpId,
+ String grpName,
+ PageMemory pageMem,
+ IgniteWriteAheadLogManager wal,
+ AtomicLong globalRmvId,
+ long metaPageId,
+ boolean initNew,
+ H2TreeIndex.IndexColumnsInfo unwrappedColsInfo,
+ H2TreeIndex.IndexColumnsInfo wrappedColsInfo,
+ AtomicInteger maxCalculatedInlineSize,
+ boolean pk,
+ boolean affinityKey,
+ boolean mvccEnabled,
+ @Nullable H2RowCache rowCache,
+ @Nullable FailureProcessor failureProcessor,
+ IgniteLogger log, IoStatisticsHolder stats
+ ) throws IgniteCheckedException {
+ super(
+ cctx,
+ table,
+ name,
+ idxName,
+ cacheName,
+ tblName,
+ reuseList,
+ grpId,
+ grpName,
+ pageMem,
+ wal,
+ globalRmvId,
+ metaPageId,
+ initNew,
+ unwrappedColsInfo,
+ wrappedColsInfo,
+ maxCalculatedInlineSize,
+ pk,
+ affinityKey,
+ mvccEnabled,
+ rowCache,
+ failureProcessor,
+ log,
+ stats
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long destroyDownPages(
+ LongListReuseBag bag,
+ long pageId,
+ int lvl,
+ IgniteInClosure<H2Row> c,
+ AtomicLong lockHoldStartTime,
+ long lockMaxTime,
+ Deque<GridTuple3<Long, Long, Long>> lockedPages
+ ) throws IgniteCheckedException {
+ doSleep(TIME_FOR_EACH_INDEX_PAGE_TO_DESTROY);
+
+ if (Thread.currentThread() instanceof IgniteThread) {
+ IgniteThread thread = (IgniteThread)Thread.currentThread();
+
+ if (thread.getIgniteInstanceName().endsWith(String.valueOf(RESTARTED_NODE_NUM))
+ && blockDestroy.compareAndSet(true, false))
+ throw new RuntimeException("Aborting destroy (test).");
+ }
+
+ return super.destroyDownPages(bag, pageId, lvl, c, lockHoldStartTime, lockMaxTime, lockedPages);
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 984d17e..0a83ae9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexi
import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotInitNewPageTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IndexingMultithreadedLoadContinuousRestartTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.LongDestroyDurableBackgroundTaskTest;
import org.apache.ignite.internal.processors.database.IgniteDbMultiNodeWithIndexingPutGetTest;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingWalRestoreTest;
@@ -44,7 +45,8 @@ import org.junit.runners.Suite;
IgniteTwoRegionsRebuildIndexTest.class,
IgniteTcBotInitNewPageTest.class,
RebuildIndexWithHistoricalRebalanceTest.class,
- IndexingMultithreadedLoadContinuousRestartTest.class
+ IndexingMultithreadedLoadContinuousRestartTest.class,
+ LongDestroyDurableBackgroundTaskTest.class
})
public class IgnitePdsWithIndexingTestSuite {
}