You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/07/27 14:28:42 UTC
[ignite] branch master updated: IGNITE-15026 Fix storage of
physical pageIds in a DurableBackgroundCleanupIndexTreeTask (#9207)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a424443 IGNITE-15026 Fix storage of physical pageIds in a DurableBackgroundCleanupIndexTreeTask (#9207)
a424443 is described below
commit a424443aebfffefb873671145b2af6be3bde66a4
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Tue Jul 27 17:28:11 2021 +0300
IGNITE-15026 Fix storage of physical pageIds in a DurableBackgroundCleanupIndexTreeTask (#9207)
---
.../DurableBackgroundCleanupIndexTreeTask.java | 66 +--
.../DurableBackgroundCleanupIndexTreeTaskV2.java | 497 +++++++++++++++++++
.../sorted/defragmentation/DefragIndexFactory.java | 3 +-
.../index/sorted/inline/InlineIndexFactory.java | 3 +-
.../query/index/sorted/inline/InlineIndexImpl.java | 64 +--
.../query/index/sorted/inline/InlineIndexTree.java | 50 +-
.../ignite/internal/pagemem/PageIdAllocator.java | 2 +-
.../cache/IgniteCacheOffheapManager.java | 6 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 8 +-
.../cache/persistence/GridCacheOffheapManager.java | 8 +-
.../processors/cache/persistence/IndexStorage.java | 10 +-
.../cache/persistence/IndexStorageImpl.java | 9 +-
.../processors/cache/persistence/RootPage.java | 4 +-
.../pendingtask/DurableBackgroundTask.java | 14 +-
.../pendingtask/DurableBackgroundTaskResult.java | 52 +-
.../cache/persistence/tree/BPlusTree.java | 4 +-
.../localtask/DurableBackgroundTaskState.java | 31 +-
.../localtask/DurableBackgroundTasksProcessor.java | 191 ++++---
.../processors/localtask/ConvertibleTask.java | 49 ++
.../DurableBackgroundTasksProcessorSelfTest.java | 127 ++++-
.../internal/processors/localtask/SimpleTask.java | 6 +-
.../cache/index/AbstractRebuildIndexTest.java | 28 ++
.../processors/cache/index/DropIndexTest.java | 547 +++++++++++++++++++++
.../cache/index/RenameIndexTreeTest.java | 131 ++++-
.../db/LongDestroyDurableBackgroundTaskTest.java | 180 ++++---
.../MultipleParallelCacheDeleteDeadlockTest.java | 94 ++--
.../testsuites/IgnitePdsWithIndexingTestSuite.java | 4 +-
27 files changed, 1809 insertions(+), 379 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
index 00cb35e..e568dad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.cache.query.index.sorted;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
@@ -26,16 +25,14 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cache.query.index.IndexName;
-import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyType;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.NoopRowHandlerFactory;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexTree;
-import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey;
import org.apache.ignite.internal.metric.IoStatisticsHolderIndex;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
@@ -54,6 +51,8 @@ import static org.apache.ignite.internal.metric.IoStatisticsType.SORTED_INDEX;
/**
* Tasks that cleans up index tree.
+ *
+ * @deprecated Use {@link DurableBackgroundCleanupIndexTreeTaskV2}.
*/
public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundTask {
/** */
@@ -226,9 +225,9 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT
String treeName = "deletedTree_" + i + "_" + name();
InlineIndexTree tree = new InlineIndexTree(
- null, cctx, treeName, cctx.offheap(), cctx.offheap().reuseListForIndex(treeName),
+ null, grpCtx, treeName, cctx.offheap(), cctx.offheap().reuseListForIndex(treeName),
cctx.dataRegion().pageMemory(), PageIoResolver.DEFAULT_PAGE_IO_RESOLVER,
- rootPage, false, 0, new IndexKeyTypeSettings(), null,
+ rootPage, false, 0, 0, new IndexKeyTypeSettings(), null,
stats, new NoopRowHandlerFactory(), null);
trees0.add(tree);
@@ -303,49 +302,20 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(DurableBackgroundCleanupIndexTreeTask.class, this);
+ @Override public DurableBackgroundTask<?> convertAfterRestoreIfNeeded() {
+ return new DurableBackgroundCleanupIndexTreeTaskV2(
+ cacheGrpName,
+ cacheName,
+ idxName,
+ treeName,
+ UUID.randomUUID().toString(),
+ rootPages.size(),
+ null
+ );
}
- /** */
- private static class NoopRowHandlerFactory implements InlineIndexRowHandlerFactory {
- /** {@inheritDoc} */
- @Override public InlineIndexRowHandler create(SortedIndexDefinition sdef, IndexKeyTypeSettings keyTypeSettings) {
- return new InlineIndexRowHandler() {
- /** {@inheritDoc} */
- @Override public IndexKey indexKey(int idx, CacheDataRow row) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public List<InlineIndexKeyType> inlineIndexKeyTypes() {
- return Collections.emptyList();
- }
-
- /** {@inheritDoc} */
- @Override public List<IndexKeyDefinition> indexKeyDefinitions() {
- return Collections.emptyList();
- }
-
- @Override public IndexKeyTypeSettings indexKeyTypeSettings() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public int partition(CacheDataRow row) {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public Object cacheKey(CacheDataRow row) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Object cacheValue(CacheDataRow row) {
- return null;
- }
- };
- }
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DurableBackgroundCleanupIndexTreeTask.class, this);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTaskV2.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTaskV2.java
new file mode 100644
index 0000000..6eefbc6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTaskV2.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cache.query.index.sorted;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyType;
+import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexTree;
+import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.IndexRenameRootPageRecord;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIoResolver;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singleton;
+
+/**
+ * Task for background cleaning of index trees.
+ */
+public class DurableBackgroundCleanupIndexTreeTaskV2 extends IgniteDataTransferObject implements
+ DurableBackgroundTask<Long> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Index tree index factory.
+ * NOTE: Change only in tests, to control the creation of trees in the task.
+ */
+ public static InlineIndexTreeFactory idxTreeFactory = new InlineIndexTreeFactory();
+
+ /** Logger. */
+ @Nullable private transient volatile IgniteLogger log;
+
+ /** Unique id. */
+ private String uid;
+
+ /** Cache group name. */
+ @Nullable private String grpName;
+
+ /** Cache name. */
+ private String cacheName;
+
+ /** Index name. */
+ private String idxName;
+
+ /** Old name of underlying index tree name. */
+ private String oldTreeName;
+
+ /** New name of underlying index tree name. */
+ private String newTreeName;
+
+ /** Number of segments. */
+ private int segments;
+
+ /** Need to rename index root pages. */
+ private transient volatile boolean needToRen;
+
+ /** Index root pages. Mapping: segment number -> index root page. */
+ private final transient Map<Integer, RootPage> rootPages = new ConcurrentHashMap<>();
+
+ /** Worker cleaning index trees. */
+ @Nullable private transient volatile GridWorker worker;
+
+ /** Total number of pages recycled from index trees. */
+ private final transient AtomicLong pageCnt = new AtomicLong();
+
+ /**
+ * Constructor.
+ *
+ * @param grpName Cache group name.
+ * @param cacheName Cache name.
+ * @param idxName Index name.
+ * @param oldTreeName Old name of underlying index tree name.
+ * @param newTreeName New name of underlying index tree name.
+ * @param segments Number of segments.
+ * @param trees Index trees.
+ */
+ public DurableBackgroundCleanupIndexTreeTaskV2(
+ @Nullable String grpName,
+ String cacheName,
+ String idxName,
+ String oldTreeName,
+ String newTreeName,
+ int segments,
+ @Nullable InlineIndexTree[] trees
+ ) {
+ uid = UUID.randomUUID().toString();
+ this.grpName = grpName;
+ this.cacheName = cacheName;
+ this.idxName = idxName;
+ this.oldTreeName = oldTreeName;
+ this.newTreeName = newTreeName;
+ this.segments = segments;
+
+ if (trees != null) {
+ assert trees.length == segments :
+ "Invalid number of index trees [trees=" + trees.length + ", segments=" + segments + ']';
+
+ this.rootPages.putAll(toRootPages(trees));
+ }
+
+ needToRen = true;
+ }
+
+ /**
+ * Default constructor for {@link Externalizable}.
+ */
+ public DurableBackgroundCleanupIndexTreeTaskV2() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+ U.writeLongString(out, uid);
+ U.writeLongString(out, grpName);
+ U.writeLongString(out, cacheName);
+ U.writeLongString(out, idxName);
+ U.writeLongString(out, oldTreeName);
+ U.writeLongString(out, newTreeName);
+ out.writeInt(segments);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(
+ byte protoVer,
+ ObjectInput in
+ ) throws IOException, ClassNotFoundException {
+ uid = U.readLongString(in);
+ grpName = U.readLongString(in);
+ cacheName = U.readLongString(in);
+ idxName = U.readLongString(in);
+ oldTreeName = U.readLongString(in);
+ newTreeName = U.readLongString(in);
+ segments = in.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "drop-sql-index-" + cacheName + "-" + idxName + "-" + uid;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ rootPages.clear();
+
+ GridWorker w = worker;
+
+ if (w != null) {
+ worker = null;
+
+ U.awaitForWorkersStop(singleton(w), true, log);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<DurableBackgroundTaskResult<Long>> executeAsync(GridKernalContext ctx) {
+ assert worker == null;
+
+ log = ctx.log(DurableBackgroundCleanupIndexTreeTaskV2.class);
+
+ IgniteInternalFuture<DurableBackgroundTaskResult<Long>> outFut;
+
+ CacheGroupContext grpCtx = ctx.cache().cacheGroup(CU.cacheGroupId(cacheName, grpName));
+
+ if (grpCtx != null) {
+ try {
+ // Renaming should be done once when adding (and immediately launched) a task at the time of drop the index.
+ // To avoid problems due to node crash between renaming and adding a task.
+ if (needToRen) {
+ // If the node falls before renaming, then the index was definitely not dropped.
+ // If the node crashes after renaming, the task will delete the old index trees,
+ // and the node will rebuild this index when the node starts.
+ renameIndexRootPages(grpCtx, cacheName, oldTreeName, newTreeName, segments);
+
+ // After restoring from MetaStorage, it will also be {@code false}.
+ needToRen = false;
+ }
+
+ if (rootPages.isEmpty())
+ rootPages.putAll(findIndexRootPages(grpCtx, cacheName, newTreeName, segments));
+
+ if (!rootPages.isEmpty()) {
+ GridFutureAdapter<DurableBackgroundTaskResult<Long>> fut = new GridFutureAdapter<>();
+
+ GridWorker w = new GridWorker(
+ ctx.igniteInstanceName(),
+ "async-worker-" + name(),
+ log
+ ) {
+ /** {@inheritDoc} */
+ @Override protected void body() {
+ try {
+ Iterator<Map.Entry<Integer, RootPage>> it = rootPages.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<Integer, RootPage> e = it.next();
+
+ RootPage rootPage = e.getValue();
+ int segment = e.getKey();
+
+ long pages = destroyIndexTrees(grpCtx, rootPage, cacheName, newTreeName, segment);
+
+ if (pages > 0)
+ pageCnt.addAndGet(pages);
+
+ it.remove();
+ }
+
+ fut.onDone(DurableBackgroundTaskResult.complete(pageCnt.get()));
+ }
+ catch (Throwable t) {
+ fut.onDone(DurableBackgroundTaskResult.restart(t));
+ }
+ finally {
+ worker = null;
+ }
+ }
+ };
+
+ new IgniteThread(w).start();
+
+ this.worker = w;
+
+ outFut = fut;
+ }
+ else
+ outFut = new GridFinishedFuture<>(DurableBackgroundTaskResult.complete());
+ }
+ catch (Throwable t) {
+ outFut = new GridFinishedFuture<>(DurableBackgroundTaskResult.restart(t));
+ }
+ }
+ else
+ outFut = new GridFinishedFuture<>(DurableBackgroundTaskResult.complete());
+
+ return outFut;
+ }
+
+ /**
+ * Destroying index trees.
+ *
+ * @param grpCtx Cache group context.
+ * @param rootPage Index root page.
+ * @param cacheName Cache name.
+ * @param treeName Name of underlying index tree name.
+ * @param segment Segment number.
+ * @return Total number of pages recycled from this tree.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static long destroyIndexTrees(
+ CacheGroupContext grpCtx,
+ RootPage rootPage,
+ String cacheName,
+ String treeName,
+ int segment
+ ) throws IgniteCheckedException {
+ long pageCnt = 0;
+
+ grpCtx.shared().database().checkpointReadLock();
+
+ try {
+ InlineIndexTree tree = idxTreeFactory.create(grpCtx, rootPage, treeName);
+
+ pageCnt += tree.destroy(null, true);
+
+ if (grpCtx.offheap().dropRootPageForIndex(CU.cacheId(cacheName), treeName, segment) != null)
+ pageCnt++;
+ }
+ finally {
+ grpCtx.shared().database().checkpointReadUnlock();
+ }
+
+ return pageCnt;
+ }
+
+ /**
+ * Finding the root pages of the index.
+ *
+ * @param grpCtx Cache group context.
+ * @param cacheName Cache name.
+ * @param treeName Name of underlying index tree name.
+ * @param segments Number of segments.
+ * @return Index root pages. Mapping: segment number -> index root page.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static Map<Integer, RootPage> findIndexRootPages(
+ CacheGroupContext grpCtx,
+ String cacheName,
+ String treeName,
+ int segments
+ ) throws IgniteCheckedException {
+ Map<Integer, RootPage> rootPages = new HashMap<>();
+
+ for (int i = 0; i < segments; i++) {
+ RootPage rootPage = grpCtx.offheap().findRootPageForIndex(CU.cacheId(cacheName), treeName, i);
+
+ if (rootPage != null)
+ rootPages.put(i, rootPage);
+ }
+
+ return rootPages;
+ }
+
+ /**
+ * Renaming the root index pages.
+ *
+ * @param grpCtx Cache group context.
+ * @param cacheName Cache name.
+ * @param oldTreeName Old name of underlying index tree name.
+ * @param newTreeName New name of underlying index tree name.
+ * @param segments Number of segments.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static void renameIndexRootPages(
+ CacheGroupContext grpCtx,
+ String cacheName,
+ String oldTreeName,
+ String newTreeName,
+ int segments
+ ) throws IgniteCheckedException {
+ IgniteWriteAheadLogManager wal = grpCtx.shared().wal();
+
+ int cacheId = CU.cacheId(cacheName);
+
+ if (wal != null)
+ wal.log(new IndexRenameRootPageRecord(cacheId, oldTreeName, newTreeName, segments));
+
+ grpCtx.shared().database().checkpointReadLock();
+
+ try {
+ for (int i = 0; i < segments; i++)
+ grpCtx.offheap().renameRootPageForIndex(cacheId, oldTreeName, newTreeName, i);
+ }
+ finally {
+ grpCtx.shared().database().checkpointReadUnlock();
+ }
+ }
+
+ /**
+ * Create index root pages based on its trees.
+ *
+ * @param trees Index trees.
+ * @return Index root pages. Mapping: segment number -> index root page.
+ */
+ public static Map<Integer, RootPage> toRootPages(InlineIndexTree[] trees) {
+ if (F.isEmpty(trees))
+ return emptyMap();
+ else {
+ Map<Integer, RootPage> res = new HashMap<>();
+
+ for (int i = 0; i < trees.length; i++) {
+ InlineIndexTree tree = trees[i];
+
+ assert tree != null : "No tree for segment: " + i;
+
+ res.put(i, new RootPage(new FullPageId(tree.getMetaPageId(), tree.groupId()), tree.created()));
+ }
+ return res;
+ }
+ }
+
+ /**
+ * A do-nothing {@link InlineIndexRowHandlerFactory} implementation.
+ */
+ public static class NoopRowHandlerFactory implements InlineIndexRowHandlerFactory {
+ /** {@inheritDoc} */
+ @Override public InlineIndexRowHandler create(
+ SortedIndexDefinition sdef,
+ IndexKeyTypeSettings keyTypeSettings
+ ) {
+ return new InlineIndexRowHandler() {
+ /** {@inheritDoc} */
+ @Override public IndexKey indexKey(int idx, CacheDataRow row) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<InlineIndexKeyType> inlineIndexKeyTypes() {
+ return emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<IndexKeyDefinition> indexKeyDefinitions() {
+ return emptyList();
+ }
+
+ @Override public IndexKeyTypeSettings indexKeyTypeSettings() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(CacheDataRow row) {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object cacheKey(CacheDataRow row) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object cacheValue(CacheDataRow row) {
+ return null;
+ }
+ };
+ }
+ }
+
+ /**
+ * Factory for creating index trees.
+ */
+ public static class InlineIndexTreeFactory {
+ /**
+ * Creation of an index tree.
+ *
+ * @param grpCtx Cache group context.
+ * @param rootPage Index root page.
+ * @param treeName Name of underlying index tree name.
+ * @return New index tree.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected InlineIndexTree create(
+ CacheGroupContext grpCtx,
+ RootPage rootPage,
+ String treeName
+ ) throws IgniteCheckedException {
+ return new InlineIndexTree(
+ null,
+ grpCtx,
+ treeName,
+ grpCtx.offheap(),
+ grpCtx.offheap().reuseListForIndex(treeName),
+ grpCtx.dataRegion().pageMemory(),
+ PageIoResolver.DEFAULT_PAGE_IO_RESOLVER,
+ rootPage.pageId().pageId(),
+ false,
+ 0,
+ 0,
+ new IndexKeyTypeSettings(),
+ null,
+ null,
+ new NoopRowHandlerFactory(),
+ null
+ );
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DurableBackgroundCleanupIndexTreeTaskV2.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/DefragIndexFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/DefragIndexFactory.java
index 05b0a55..4f36444 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/DefragIndexFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/defragmentation/DefragIndexFactory.java
@@ -83,7 +83,7 @@ public class DefragIndexFactory extends InlineIndexFactory {
InlineIndexTree tree = new InlineIndexTree(
def,
- cctx,
+ cctx.group(),
def.treeName(),
offheap,
offheap.reuseListForIndex(def.treeName()),
@@ -93,6 +93,7 @@ public class DefragIndexFactory extends InlineIndexFactory {
rootPage.pageId().pageId(),
rootPage.isAllocated(),
oldIdx.inlineSize(),
+ cctx.config().getSqlIndexMaxInlineSize(),
def.keyTypeSettings(),
null,
stats,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexFactory.java
index d3f38a6..d28858c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexFactory.java
@@ -79,7 +79,7 @@ public class InlineIndexFactory implements IndexFactory {
RootPage rootPage, IoStatisticsHolder stats, InlineRecommender recommender, int segmentNum) throws Exception {
return new InlineIndexTree(
def,
- cctx,
+ cctx.group(),
def.treeName(),
cctx.offheap(),
cctx.offheap().reuseListForIndex(def.treeName()),
@@ -88,6 +88,7 @@ public class InlineIndexFactory implements IndexFactory {
rootPage.pageId().pageId(),
rootPage.isAllocated(),
def.inlineSize(),
+ cctx.config().getSqlIndexMaxInlineSize(),
def.keyTypeSettings(),
def.idxRowCache(),
stats,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
index ebadf4e..d09b17c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.cache.query.index.sorted.inline;
-import java.util.ArrayList;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
@@ -27,7 +25,7 @@ import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.cache.query.index.AbstractIndex;
import org.apache.ignite.internal.cache.query.index.Index;
import org.apache.ignite.internal.cache.query.index.SingleCursor;
-import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTask;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRowImpl;
@@ -45,6 +43,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
/**
@@ -422,61 +421,36 @@ public class InlineIndexImpl extends AbstractIndex implements InlineIndex {
private final AtomicBoolean destroyed = new AtomicBoolean();
/** {@inheritDoc} */
- @Override public void destroy(boolean softDelete) {
+ @Override public void destroy(boolean softDel) {
// Already destroyed.
if (!destroyed.compareAndSet(false, true))
return;
- try {
- if (cctx.affinityNode() && !softDelete) {
- List<Long> rootPages = new ArrayList<>(segments.length);
- List<InlineIndexTree> trees = new ArrayList<>(segments.length);
-
- cctx.shared().database().checkpointReadLock();
-
- try {
- for (int i = 0; i < segments.length; i++) {
- InlineIndexTree tree = segments[i];
+ if (cctx.affinityNode() && !softDel) {
+ for (InlineIndexTree segment : segments) {
+ segment.markDestroyed();
- // Just mark it as destroyed. Actual destroy later in background task.
- tree.markDestroyed();
-
- rootPages.add(tree.getMetaPageId());
- trees.add(tree);
-
- dropMetaPage(i);
- }
- }
- finally {
- cctx.shared().database().checkpointReadUnlock();
- }
+ segment.close();
+ }
- cctx.kernalContext().metric().remove(stats.metricRegistryName());
+ cctx.kernalContext().metric().remove(stats.metricRegistryName());
+ if (cctx.group().persistenceEnabled() ||
+ cctx.shared().kernalContext().state().clusterState().state() != INACTIVE) {
// Actual destroy index task.
- DurableBackgroundTask task = new DurableBackgroundCleanupIndexTreeTask(
- rootPages,
- trees,
- cctx.group().name() == null ? cctx.cache().name() : cctx.group().name(),
- cctx.cache().name(),
- def.idxName(),
- treeName
+ DurableBackgroundTask<Long> task = new DurableBackgroundCleanupIndexTreeTaskV2(
+ cctx.group().name(),
+ cctx.name(),
+ def.idxName().idxName(),
+ treeName,
+ UUID.randomUUID().toString(),
+ segments.length,
+ segments
);
cctx.kernalContext().durableBackgroundTask().executeAsync(task, cctx.config());
}
}
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
-
- /**
- * @param segIdx Segment index.
- * @throws IgniteCheckedException If failed.
- */
- private void dropMetaPage(int segIdx) throws IgniteCheckedException {
- cctx.offheap().dropRootPageForIndex(cctx.cacheId(), treeName, segIdx);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexTree.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexTree.java
index b90437f..6b3736d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexTree.java
@@ -44,7 +44,7 @@ import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
@@ -86,11 +86,11 @@ public class InlineIndexTree extends BPlusTree<IndexRow, IndexRow> {
/** */
private final InlineIndexRowHandler rowHnd;
- /** Cache context. */
- private final GridCacheContext<?, ?> cctx;
+ /** Cache group context. */
+ private final CacheGroupContext grpCtx;
/** Statistics holder used by underlying BPlusTree. */
- private final IoStatisticsHolder stats;
+ @Nullable private final IoStatisticsHolder stats;
/** */
private final IgniteLogger log;
@@ -106,7 +106,7 @@ public class InlineIndexTree extends BPlusTree<IndexRow, IndexRow> {
*/
public InlineIndexTree(
SortedIndexDefinition def,
- GridCacheContext<?, ?> cctx,
+ CacheGroupContext grpCtx,
String treeName,
IgniteCacheOffheapManager offheap,
ReuseList reuseList,
@@ -115,6 +115,7 @@ public class InlineIndexTree extends BPlusTree<IndexRow, IndexRow> {
long metaPageId,
boolean initNew,
int configuredInlineSize,
+ int maxInlineSize,
IndexKeyTypeSettings keyTypeSettings,
@Nullable IndexRowCache idxRowCache,
@Nullable IoStatisticsHolder stats,
@@ -123,22 +124,22 @@ public class InlineIndexTree extends BPlusTree<IndexRow, IndexRow> {
) throws IgniteCheckedException {
super(
treeName,
- cctx.groupId(),
- cctx.group().name(),
+ grpCtx.groupId(),
+ grpCtx.name(),
pageMemory,
- cctx.shared().wal(),
+ grpCtx.shared().wal(),
offheap.globalRemoveId(),
metaPageId,
reuseList,
PageIdAllocator.FLAG_IDX,
- cctx.shared().kernalContext().failure(),
- cctx.shared().diagnostic().pageLockTracker(),
+ grpCtx.shared().kernalContext().failure(),
+ grpCtx.shared().diagnostic().pageLockTracker(),
pageIoResolver
);
- this.cctx = cctx;
+ this.grpCtx = grpCtx;
- log = cctx.kernalContext().config().getGridLogger();
+ log = grpCtx.shared().kernalContext().config().getGridLogger();
this.stats = stats;
@@ -148,7 +149,7 @@ public class InlineIndexTree extends BPlusTree<IndexRow, IndexRow> {
this.idxRowCache = idxRowCache;
- mvccEnabled = cctx.mvccEnabled();
+ mvccEnabled = grpCtx.mvccEnabled();
if (!initNew) {
// Init from metastore.
@@ -177,8 +178,7 @@ public class InlineIndexTree extends BPlusTree<IndexRow, IndexRow> {
rowHnd = rowHndFactory.create(def, keyTypeSettings);
- inlineSize = computeInlineSize(
- rowHnd.inlineIndexKeyTypes(), configuredInlineSize, cctx.config().getSqlIndexMaxInlineSize());
+ inlineSize = computeInlineSize(rowHnd.inlineIndexKeyTypes(), configuredInlineSize, maxInlineSize);
setIos(inlineSize, mvccEnabled);
}
@@ -375,7 +375,7 @@ public class InlineIndexTree extends BPlusTree<IndexRow, IndexRow> {
CacheDataRowAdapter row = new CacheDataRowAdapter(link);
- row.initFromLink(cacheContext().group(), CacheDataRowAdapter.RowData.FULL, true);
+ row.initFromLink(cacheGroupContext(), CacheDataRowAdapter.RowData.FULL, true);
IndexRowImpl r = new IndexRowImpl(rowHandler(), row);
@@ -395,7 +395,7 @@ public class InlineIndexTree extends BPlusTree<IndexRow, IndexRow> {
int partId = PageIdUtils.partId(PageIdUtils.pageId(link));
MvccDataRow row = new MvccDataRow(
- cacheContext().group(),
+ cacheGroupContext(),
0,
link,
partId,
@@ -464,9 +464,13 @@ public class InlineIndexTree extends BPlusTree<IndexRow, IndexRow> {
return Math.min(PageIO.MAX_PAYLOAD_SIZE, size);
}
- /** */
- public GridCacheContext<?, ?> cacheContext() {
- return cctx;
+ /**
+ * Getting cache group context.
+ *
+ * @return Cache group context.
+ */
+ public CacheGroupContext cacheGroupContext() {
+ return grpCtx;
}
/** Default value for {@code IGNITE_MAX_INDEX_PAYLOAD_SIZE} */
@@ -588,13 +592,13 @@ public class InlineIndexTree extends BPlusTree<IndexRow, IndexRow> {
/** {@inheritDoc} */
@Override protected void temporaryReleaseLock() {
- cctx.kernalContext().cache().context().database().checkpointReadUnlock();
- cctx.kernalContext().cache().context().database().checkpointReadLock();
+ grpCtx.shared().database().checkpointReadUnlock();
+ grpCtx.shared().database().checkpointReadLock();
}
/** {@inheritDoc} */
@Override protected long maxLockHoldTime() {
- long sysWorkerBlockedTimeout = cctx.kernalContext().workersRegistry().getSystemWorkerBlockedTimeout();
+ long sysWorkerBlockedTimeout = grpCtx.shared().kernalContext().workersRegistry().getSystemWorkerBlockedTimeout();
// Using timeout value reduced by 10 times to increase possibility of lock releasing before timeout.
return sysWorkerBlockedTimeout == 0 ? Long.MAX_VALUE : (sysWorkerBlockedTimeout / 10);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
index f945beb..1d60017 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
@@ -68,5 +68,5 @@ public interface PageIdAllocator {
* @param grpId Cache Group ID.
* @param pageId Page ID.
*/
- public boolean freePage(int grpId, long pageId) throws IgniteCheckedException;
+ public boolean freePage(int grpId, long pageId);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 4121452..d3e839b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -563,11 +563,15 @@ public interface IgniteCacheOffheapManager {
public @Nullable RootPage findRootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException;
/**
+ * Dropping the root page of the index tree.
+ *
* @param cacheId Cache ID.
* @param idxName Index name.
+ * @param segment Segment index.
+ * @return Dropped root page of the index tree.
* @throws IgniteCheckedException If failed.
*/
- public void dropRootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException;
+ @Nullable RootPage dropRootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException;
/**
* Renaming the root page of the index tree.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 75b7a3e..0283673 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1119,8 +1119,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public void dropRootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException {
- // No-op.
+ @Override public @Nullable RootPage dropRootPageForIndex(
+ int cacheId,
+ String idxName,
+ int segment
+ ) throws IgniteCheckedException {
+ return null; // No-op.
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 33fd6bb..e1cdf2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1071,8 +1071,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public void dropRootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException {
- indexStorage.dropCacheIndex(cacheId, idxName, segment);
+ @Override public @Nullable RootPage dropRootPageForIndex(
+ int cacheId,
+ String idxName,
+ int segment
+ ) throws IgniteCheckedException {
+ return indexStorage.dropCacheIndex(cacheId, idxName, segment);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorage.java
index dbe42d0..cc0a0c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorage.java
@@ -62,20 +62,20 @@ public interface IndexStorage {
*
* @param cacheId Cache ID.
* @param idxName Index name.
- * @param segment Segment.
- * @return Root ID or -1 if no page was removed.
+ * @param segment Segment number.
+ * @return Root ID or {@code null} if no page was removed.
* @throws IgniteCheckedException If failed.
*/
- public RootPage dropCacheIndex(Integer cacheId, String idxName, int segment) throws IgniteCheckedException;
+ @Nullable RootPage dropCacheIndex(Integer cacheId, String idxName, int segment) throws IgniteCheckedException;
/**
* Deallocate index page and remove from tree.
*
* @param idxName Index name.
- * @return Root ID or -1 if no page was removed.
+ * @return Root ID or {@code null} if no page was removed.
* @throws IgniteCheckedException If failed.
*/
- public RootPage dropIndex(String idxName) throws IgniteCheckedException;
+ @Nullable RootPage dropIndex(String idxName) throws IgniteCheckedException;
/**
* Renaming the root page of the index tree.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java
index 3079a88..fb84d6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java
@@ -188,15 +188,18 @@ public class IndexStorageImpl implements IndexStorage {
}
/** {@inheritDoc} */
- @Override public RootPage dropCacheIndex(Integer cacheId, String idxName, int segment)
- throws IgniteCheckedException {
+ @Override public @Nullable RootPage dropCacheIndex(
+ Integer cacheId,
+ String idxName,
+ int segment
+ ) throws IgniteCheckedException {
String maskedIdxName = maskCacheIndexName(cacheId, idxName, segment);
return dropIndex(maskedIdxName);
}
/** {@inheritDoc} */
- @Override public RootPage dropIndex(String idxName) throws IgniteCheckedException {
+ @Override public @Nullable RootPage dropIndex(String idxName) throws IgniteCheckedException {
byte[] idxNameBytes = idxName.getBytes(UTF_8);
IndexItem row = metaTree.remove(new IndexItem(idxNameBytes, 0));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RootPage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RootPage.java
index da9efe5..c015b8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RootPage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RootPage.java
@@ -27,10 +27,10 @@ import org.apache.ignite.internal.util.typedef.internal.S;
public class RootPage {
/** */
@GridToStringInclude
- private FullPageId pageId;
+ private final FullPageId pageId;
/** */
- private boolean allocated;
+ private final boolean allocated;
/**
* @param pageId Page ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
index 743f84c..09fab9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
@@ -22,8 +22,9 @@ import org.apache.ignite.internal.IgniteInternalFuture;
/**
* Durable task that should be used to do long operations (e.g. index deletion) in background.
+ * @param <R> Type of the result of the task.
*/
-public interface DurableBackgroundTask extends Serializable {
+public interface DurableBackgroundTask<R> extends Serializable {
/**
* Getting the name of the task to identify it.
* Also used as part of a key for storage in a MetaStorage.
@@ -45,5 +46,14 @@ public interface DurableBackgroundTask extends Serializable {
* @param ctx Kernal context.
* @return Future of the tasks.
*/
- IgniteInternalFuture<DurableBackgroundTaskResult> executeAsync(GridKernalContext ctx);
+ IgniteInternalFuture<DurableBackgroundTaskResult<R>> executeAsync(GridKernalContext ctx);
+
+ /**
+ * Converting the current task to another after restoring from metaStorage.
+ *
+ * @return Converted task.
+ */
+ default DurableBackgroundTask<?> convertAfterRestoreIfNeeded() {
+ return this;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTaskResult.java
index 5d82e60..3bb34e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTaskResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTaskResult.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -28,8 +29,9 @@ import org.jetbrains.annotations.Nullable;
* <li>{@link #completed Completed} - the task has completed its execution and should be deleted.</li>
* <li>{@link #restart Restart} - the task has not yet completed its execution and must be restarted.</li>
* </ul>
+ * @param <R> Type of the result of the task.
*/
-public class DurableBackgroundTaskResult {
+public class DurableBackgroundTaskResult<R> {
/** Completed state. */
private static final Object COMPLETED = new Object();
@@ -42,15 +44,21 @@ public class DurableBackgroundTaskResult {
/** An error occurred while executing the task. */
@Nullable private final Throwable err;
+ /** Result of the task. */
+ @GridToStringInclude
+ @Nullable private final R res;
+
/**
* Constructor.
*
- * @param res Execution state.
+ * @param state Execution state.
* @param err An error occurred while executing the task.
+ * @param res Result of the task.
*/
- private DurableBackgroundTaskResult(Object res, @Nullable Throwable err) {
- this.state = res;
+ private DurableBackgroundTaskResult(Object state, @Nullable Throwable err, @Nullable R res) {
+ this.state = state;
this.err = err;
+ this.res = res;
}
/**
@@ -59,8 +67,27 @@ public class DurableBackgroundTaskResult {
* @param err An error occurred while executing the task.
* @return Result of executing a durable background task.
*/
- public static DurableBackgroundTaskResult complete(@Nullable Throwable err) {
- return new DurableBackgroundTaskResult(COMPLETED, err);
+ public static <R> DurableBackgroundTaskResult<R> complete(@Nullable Throwable err) {
+ return new DurableBackgroundTaskResult<>(COMPLETED, err, null);
+ }
+
+ /**
+ * Creation of a completed task execution result that does not require restarting it.
+ *
+ * @param res Result of the task.
+ * @return Result of executing a durable background task.
+ */
+ public static <R> DurableBackgroundTaskResult<R> complete(@Nullable R res) {
+ return new DurableBackgroundTaskResult<>(COMPLETED, null, res);
+ }
+
+ /**
+ * Creation of a completed task execution result that does not require restarting it.
+ *
+ * @return Result of executing a durable background task.
+ */
+ public static <R> DurableBackgroundTaskResult<R> complete() {
+ return new DurableBackgroundTaskResult<>(COMPLETED, null, null);
}
/**
@@ -69,8 +96,8 @@ public class DurableBackgroundTaskResult {
* @param err An error occurred while executing the task.
* @return Result of executing a durable background task.
*/
- public static DurableBackgroundTaskResult restart(@Nullable Throwable err) {
- return new DurableBackgroundTaskResult(RESTART, err);
+ public static <R> DurableBackgroundTaskResult<R> restart(@Nullable Throwable err) {
+ return new DurableBackgroundTaskResult<>(RESTART, err, null);
}
/**
@@ -100,6 +127,15 @@ public class DurableBackgroundTaskResult {
return err;
}
+ /**
+ * Getting the result of the task.
+ *
+ * @return Result of the task.
+ */
+ @Nullable public R result() {
+ return res;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DurableBackgroundTaskResult.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index a195805..dc6aeec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -2513,7 +2513,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* used as metadata storage, or {@code -1} if we don't have a reuse list and did not do recycling at all.
* @throws IgniteCheckedException If failed.
*/
- public final long destroy(IgniteInClosure<L> c, boolean forceDestroy) throws IgniteCheckedException {
+ public final long destroy(@Nullable IgniteInClosure<L> c, boolean forceDestroy) throws IgniteCheckedException {
close();
if (!markDestroyed() && !forceDestroy)
@@ -2591,7 +2591,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
LongListReuseBag bag,
long pageId,
int lvl,
- IgniteInClosure<L> c,
+ @Nullable IgniteInClosure<L> c,
AtomicLong lockHoldStartTime,
long lockMaxTime,
Deque<GridTuple3<Long, Long, Long>> lockedPages
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTaskState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTaskState.java
index 32ab7b9..822a43c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTaskState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTaskState.java
@@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
@@ -33,7 +32,7 @@ import static org.apache.ignite.internal.processors.localtask.DurableBackgroundT
*
* If the task needs to be restarted, it must have status INIT.
*/
-public class DurableBackgroundTaskState {
+public class DurableBackgroundTaskState<R> {
/**
* Enumeration of the current state of the task.
*/
@@ -56,10 +55,10 @@ public class DurableBackgroundTaskState {
AtomicReferenceFieldUpdater.newUpdater(DurableBackgroundTaskState.class, State.class, "state");
/** Durable background task. */
- private final DurableBackgroundTask task;
+ private final DurableBackgroundTask<R> task;
/** Outside task future. */
- @Nullable private final GridFutureAdapter<Void> outFut;
+ private final GridFutureAdapter<R> outFut;
/** Task has been saved to the MetaStorage. */
private final boolean saved;
@@ -67,6 +66,9 @@ public class DurableBackgroundTaskState {
/** Current state of the task. */
private volatile State state = INIT;
+ /** Converted from another task. */
+ private final boolean converted;
+
/**
* Constructor.
*
@@ -75,13 +77,15 @@ public class DurableBackgroundTaskState {
* @param saved Task has been saved to the MetaStorage.
*/
public DurableBackgroundTaskState(
- DurableBackgroundTask task,
- @Nullable GridFutureAdapter<Void> outFut,
- boolean saved
+ DurableBackgroundTask<R> task,
+ GridFutureAdapter<R> outFut,
+ boolean saved,
+ boolean converted
) {
this.task = task;
this.outFut = outFut;
this.saved = saved;
+ this.converted = converted;
}
/**
@@ -89,7 +93,7 @@ public class DurableBackgroundTaskState {
*
* @return Durable background task.
*/
- public DurableBackgroundTask task() {
+ public DurableBackgroundTask<R> task() {
return task;
}
@@ -98,7 +102,7 @@ public class DurableBackgroundTaskState {
*
* @return Outside task future.
*/
- @Nullable public GridFutureAdapter<Void> outFuture() {
+ public GridFutureAdapter<R> outFuture() {
return outFut;
}
@@ -140,6 +144,15 @@ public class DurableBackgroundTaskState {
return STATE_UPDATER.compareAndSet(this, exp, newState);
}
+ /**
+ * Check if the task has been converted from another.
+ *
+ * @return {@code True} if it was converted from another task.
+ */
+ public boolean converted() {
+ return converted;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DurableBackgroundTaskState.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
index 8cee313..0561e27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
@@ -61,7 +61,7 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
private final Object metaStorageMux = new Object();
/** Current tasks. Mapping: {@link DurableBackgroundTask#name task name} -> task state. */
- private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, DurableBackgroundTaskState<?>> tasks = new ConcurrentHashMap<>();
/** Lock for canceling tasks. */
private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
@@ -70,10 +70,10 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
* Tasks to be removed from the MetaStorage after the end of a checkpoint.
* Mapping: {@link DurableBackgroundTask#name task name} -> task.
*/
- private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, DurableBackgroundTask<?>> toRmv = new ConcurrentHashMap<>();
- /** Prohibiting the execution of tasks. */
- private volatile boolean prohibitionExecTasks = true;
+ /** Prohibiting the execution of tasks. Guarded by {@link #cancelLock}. */
+ private boolean prohibitionExecTasks = true;
/** Node stop lock. */
private final GridBusyLock stopLock = new GridBusyLock();
@@ -111,9 +111,34 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
metaStorage.iterate(
TASK_PREFIX,
(k, v) -> {
- DurableBackgroundTask t = (DurableBackgroundTask)v;
+ DurableBackgroundTask task = ((DurableBackgroundTask<?>)v);
+ DurableBackgroundTask convertedTask = task.convertAfterRestoreIfNeeded();
- tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+ boolean converted = false;
+
+ if (task != convertedTask) {
+ assert !task.name().equals(convertedTask.name()) :
+ "Duplicate task names [original=" + task.name() +
+ ", converted=" + convertedTask.name() + ']';
+
+ GridFutureAdapter<?> outFut = new GridFutureAdapter<>();
+ outFut.onDone();
+
+ DurableBackgroundTaskState<?> state =
+ new DurableBackgroundTaskState<>(task, outFut, true, false);
+
+ state.state(COMPLETED);
+
+ tasks.put(task.name(), state);
+
+ task = convertedTask;
+ converted = true;
+ }
+
+ tasks.put(
+ task.name(),
+ new DurableBackgroundTaskState<>(task, new GridFutureAdapter<>(), true, converted)
+ );
},
true
);
@@ -126,7 +151,27 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
/** {@inheritDoc} */
@Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
- ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+ if (!stopLock.enterBusy())
+ return;
+
+ try {
+ for (DurableBackgroundTaskState<?> state : tasks.values()) {
+ if (state.converted()) {
+ metaStorageOperation(metaStorage -> {
+ assert metaStorage != null;
+
+ DurableBackgroundTask<?> task = state.task();
+
+ metaStorage.write(metaStorageKey(state.task()), task);
+ });
+ }
+ }
+
+ ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+ }
+ finally {
+ stopLock.leaveBusy();
+ }
}
/** {@inheritDoc} */
@@ -136,13 +181,13 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
/** {@inheritDoc} */
@Override public void onMarkCheckpointBegin(Context ctx) {
- for (Iterator<DurableBackgroundTaskState> it = tasks.values().iterator(); it.hasNext(); ) {
- DurableBackgroundTaskState taskState = it.next();
+ for (Iterator<DurableBackgroundTaskState<?>> it = tasks.values().iterator(); it.hasNext(); ) {
+ DurableBackgroundTaskState<?> taskState = it.next();
if (taskState.state() == COMPLETED) {
assert taskState.saved();
- DurableBackgroundTask t = taskState.task();
+ DurableBackgroundTask<?> t = taskState.task();
toRmv.put(t.name(), t);
@@ -162,8 +207,8 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
return;
try {
- for (Iterator<DurableBackgroundTask> it = toRmv.values().iterator(); it.hasNext(); ) {
- DurableBackgroundTask t = it.next();
+ for (Iterator<DurableBackgroundTask<?>> it = toRmv.values().iterator(); it.hasNext(); ) {
+ DurableBackgroundTask<?> t = it.next();
metaStorageOperation(metaStorage -> {
if (metaStorage != null) {
@@ -197,12 +242,17 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
*/
public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
if (msg.state() != ClusterState.INACTIVE) {
- prohibitionExecTasks = false;
+ cancelLock.writeLock().lock();
+
+ try {
+ prohibitionExecTasks = false;
- for (DurableBackgroundTaskState taskState : tasks.values()) {
- if (!prohibitionExecTasks)
+ for (DurableBackgroundTaskState<?> taskState : tasks.values())
executeAsync0(taskState.task());
}
+ finally {
+ cancelLock.writeLock().unlock();
+ }
}
}
@@ -223,19 +273,21 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
* @param save Save task to MetaStorage.
* @return Futures that will complete when the task is completed.
*/
- public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask task, boolean save) {
+ public <R> IgniteInternalFuture<R> executeAsync(DurableBackgroundTask<R> task, boolean save) {
if (!stopLock.enterBusy())
throw new IgniteException("Node is stopping.");
try {
- DurableBackgroundTaskState taskState = tasks.compute(task.name(), (taskName, prev) -> {
- if (prev != null && prev.state() != COMPLETED) {
- throw new IllegalArgumentException("Task is already present and has not been completed: " +
- taskName);
- }
+ DurableBackgroundTaskState<?> taskState = tasks.compute(
+ task.name(),
+ (taskName, prev) -> {
+ if (prev != null && prev.state() != COMPLETED) {
+ throw new IllegalArgumentException("Task is already present and has not been completed: " +
+ taskName);
+ }
- return new DurableBackgroundTaskState(task, new GridFutureAdapter<>(), save);
- });
+ return new DurableBackgroundTaskState<>(task, new GridFutureAdapter<>(), save, false);
+ });
if (save) {
metaStorageOperation(metaStorage -> {
@@ -244,10 +296,9 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
});
}
- if (!prohibitionExecTasks)
- executeAsync0(task);
+ executeAsync0(task);
- return taskState.outFuture();
+ return (IgniteInternalFuture<R>)taskState.outFuture();
}
finally {
stopLock.leaveBusy();
@@ -262,7 +313,7 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
* @param cacheCfg Cache configuration.
* @return Futures that will complete when the task is completed.
*/
- public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask t, CacheConfiguration cacheCfg) {
+ public <R> IgniteInternalFuture<R> executeAsync(DurableBackgroundTask<R> t, CacheConfiguration cacheCfg) {
return executeAsync(t, CU.isPersistentCache(cacheCfg, ctx.config().getDataStorageConfiguration()));
}
@@ -271,56 +322,56 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
*
* @param t Durable background task.
*/
- private void executeAsync0(DurableBackgroundTask t) {
+ private void executeAsync0(DurableBackgroundTask<?> t) {
cancelLock.readLock().lock();
try {
- DurableBackgroundTaskState taskState = tasks.get(t.name());
+ if (!prohibitionExecTasks) {
+ DurableBackgroundTaskState<?> taskState = tasks.get(t.name());
- if (taskState != null && taskState.state(INIT, PREPARE)) {
- if (log.isInfoEnabled())
- log.info("Executing durable background task: " + t.name());
+ if (taskState != null && taskState.state(INIT, PREPARE)) {
+ if (log.isInfoEnabled())
+ log.info("Executing durable background task: " + t.name());
- t.executeAsync(ctx).listen(f -> {
- DurableBackgroundTaskResult res = null;
+ t.executeAsync(ctx).listen(f -> {
+ DurableBackgroundTaskResult<?> res = null;
- try {
- res = f.get();
- }
- catch (Throwable e) {
- log.error("Task completed with an error: " + t.name(), e);
- }
-
- assert res != null;
+ try {
+ res = f.get();
+ }
+ catch (Throwable e) {
+ log.error("Task completed with an error: " + t.name(), e);
+ }
- if (res.error() != null)
- log.error("Could not execute durable background task: " + t.name(), res.error());
+ assert res != null;
- if (res.completed()) {
- if (res.error() == null && log.isInfoEnabled())
- log.info("Execution of durable background task completed: " + t.name());
+ if (res.error() != null)
+ log.error("Could not execute durable background task: " + t.name(), res.error());
- if (taskState.saved())
- taskState.state(COMPLETED);
- else
- tasks.remove(t.name());
+ if (res.completed()) {
+ if (res.error() == null && log.isInfoEnabled())
+ log.info("Execution of durable background task completed: " + t.name());
- GridFutureAdapter<Void> outFut = taskState.outFuture();
+ if (taskState.saved())
+ taskState.state(COMPLETED);
+ else
+ tasks.remove(t.name());
- if (outFut != null)
- outFut.onDone(res.error());
- }
- else {
- assert res.restart();
+ GridFutureAdapter<Object> outFut = (GridFutureAdapter<Object>)taskState.outFuture();
+ outFut.onDone(res.result(), res.error());
+ }
+ else {
+ assert res.restart();
- if (log.isInfoEnabled())
- log.info("Execution of durable background task will be restarted: " + t.name());
+ if (log.isInfoEnabled())
+ log.info("Execution of durable background task will be restarted: " + t.name());
- taskState.state(INIT);
- }
- });
+ taskState.state(INIT);
+ }
+ });
- taskState.state(PREPARE, STARTED);
+ taskState.state(PREPARE, STARTED);
+ }
}
}
finally {
@@ -329,19 +380,17 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
}
/**
- * Canceling tasks that are currently being executed.
+ * Canceling tasks.
* Prohibiting the execution of tasks.
*/
private void cancelTasks() {
- prohibitionExecTasks = true;
-
cancelLock.writeLock().lock();
try {
- for (DurableBackgroundTaskState taskState : tasks.values()) {
- if (taskState.state() == STARTED)
- taskState.task().cancel();
- }
+ prohibitionExecTasks = true;
+
+ for (DurableBackgroundTaskState<?> taskState : tasks.values())
+ taskState.task().cancel();
}
finally {
cancelLock.writeLock().unlock();
@@ -381,7 +430,7 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
* @param t Durable background task.
* @return MetaStorage {@code t} key.
*/
- static String metaStorageKey(DurableBackgroundTask t) {
+ static String metaStorageKey(DurableBackgroundTask<?> t) {
return TASK_PREFIX + t.name();
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ConvertibleTask.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ConvertibleTask.java
new file mode 100644
index 0000000..5adabc0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ConvertibleTask.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.localtask;
+
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+
+/**
+ * The task to be convertible after restoring from metaStorage.
+ */
+class ConvertibleTask extends SimpleTask {
+ /** Serial version UID. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Default constructor.
+ */
+ public ConvertibleTask() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param name Task name.
+ */
+ public ConvertibleTask(String name) {
+ super(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public DurableBackgroundTask<?> convertAfterRestoreIfNeeded() {
+ return new SimpleTask("converted-task-" + name());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
index be2c25d..f5cc13d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
@@ -134,7 +134,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, false);
t.onExecFut.get(getTestTimeout());
- checkStateAndMetaStorage(n, t, STARTED, false);
+ checkStateAndMetaStorage(n, t, STARTED, false, false);
assertFalse(execAsyncFut.isDone());
n.cluster().state(INACTIVE);
@@ -159,7 +159,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
execAsync(n, t, true);
t.onExecFut.get(getTestTimeout());
- checkStateAndMetaStorage(n, t, STARTED, true);
+ checkStateAndMetaStorage(n, t, STARTED, true, false);
t.taskFut.onDone(restart(null));
stopAllGrids();
@@ -170,7 +170,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
t = ((SimpleTask)tasks(n).get(t.name()).task());
t.onExecFut.get(getTestTimeout());
- checkStateAndMetaStorage(n, t, STARTED, true);
+ checkStateAndMetaStorage(n, t, STARTED, true, false);
t.taskFut.onDone(complete(null));
}
@@ -255,6 +255,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
simpleTask0.onExecFut.get(getTestTimeout());
+ forceCheckpoint();
dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
simpleTask0.taskFut.onDone(DurableBackgroundTaskResult.complete(null));
@@ -280,6 +281,47 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
}
/**
+ * Checking the correctness of using the {@link DurableBackgroundTask#convertAfterRestoreIfNeeded}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testConvertAfterRestoreIfNeeded() throws Exception {
+ IgniteEx n = startGrid(0);
+ n.cluster().state(ACTIVE);
+
+ String taskName0 = "test-task0";
+ String taskName1 = "test-task1";
+
+ ConvertibleTask t0 = new ConvertibleTask(taskName0);
+ SimpleTask t1 = new SimpleTask(taskName1);
+
+ SimpleTask t2 = (SimpleTask)t0.convertAfterRestoreIfNeeded();
+ assertEquals("converted-task-" + taskName0, t2.name());
+
+ assertNotNull(n.context().durableBackgroundTask().executeAsync(t0, true));
+ assertNotNull(n.context().durableBackgroundTask().executeAsync(t1, true));
+
+ assertEquals(2, tasks(n).size());
+
+ checkStateAndMetaStorage(n, t0, STARTED, true, false);
+ checkStateAndMetaStorage(n, t1, STARTED, true, false);
+
+ stopGrid(0);
+
+ n = startGrid(0);
+
+ assertEquals(3, tasks(n).size());
+
+ checkStateAndMetaStorage(n, t0, COMPLETED, true, true, false);
+ checkStateAndMetaStorage(n, t1, INIT, true, false, false);
+
+ n.cluster().state(ACTIVE);
+
+ checkStateAndMetaStorage(n, t2, STARTED, true, false, true);
+ }
+
+ /**
* Check that the task will be restarted correctly.
*
* @param save Save to MetaStorage.
@@ -293,21 +335,22 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, save);
t.onExecFut.get(getTestTimeout());
- checkStateAndMetaStorage(n, t, STARTED, save);
+ checkStateAndMetaStorage(n, t, STARTED, save, false);
assertFalse(execAsyncFut.isDone());
if (save) {
ObservingCheckpointListener checkpointLsnr = new ObservingCheckpointListener();
dbMgr(n).addCheckpointListener(checkpointLsnr);
+ forceCheckpoint();
dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
t.taskFut.onDone(restart(null));
- checkStateAndMetaStorage(n, t, INIT, true);
+ checkStateAndMetaStorage(n, t, INIT, true, false);
assertFalse(execAsyncFut.isDone());
GridFutureAdapter<Void> onMarkCheckpointBeginFut = checkpointLsnr.onMarkCheckpointBeginAsync(ctx -> {
- checkStateAndMetaStorage(n, t, INIT, true);
+ checkStateAndMetaStorage(n, t, INIT, true, false);
assertFalse(toRmv(n).containsKey(t.name()));
});
@@ -316,7 +359,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
}
else {
t.taskFut.onDone(restart(null));
- checkStateAndMetaStorage(n, t, INIT, false);
+ checkStateAndMetaStorage(n, t, INIT, false, false);
assertFalse(execAsyncFut.isDone());
}
@@ -326,7 +369,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
n.cluster().state(ACTIVE);
t.onExecFut.get(getTestTimeout());
- checkStateAndMetaStorage(n, t, STARTED, save);
+ checkStateAndMetaStorage(n, t, STARTED, save, false);
assertFalse(execAsyncFut.isDone());
t.taskFut.onDone(complete(null));
@@ -345,10 +388,10 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
SimpleTask t = new SimpleTask("t");
IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, save);
- checkStateAndMetaStorage(n, t, INIT, save);
+ checkStateAndMetaStorage(n, t, INIT, save, false);
checkExecuteSameTask(n, t);
- checkStateAndMetaStorage(n, t, INIT, save);
+ checkStateAndMetaStorage(n, t, INIT, save, false);
assertFalse(t.onExecFut.isDone());
assertFalse(execAsyncFut.isDone());
@@ -357,29 +400,30 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
t.onExecFut.get(getTestTimeout());
- checkStateAndMetaStorage(n, t, STARTED, save);
+ checkStateAndMetaStorage(n, t, STARTED, save, false);
assertFalse(execAsyncFut.isDone());
if (save) {
+ forceCheckpoint();
dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
t.taskFut.onDone(complete(null));
execAsyncFut.get(getTestTimeout());
- checkStateAndMetaStorage(n, t, COMPLETED, true);
+ checkStateAndMetaStorage(n, t, COMPLETED, true, true);
ObservingCheckpointListener checkpointLsnr = new ObservingCheckpointListener();
GridFutureAdapter<Void> onMarkCheckpointBeginFut = checkpointLsnr.onMarkCheckpointBeginAsync(
ctx -> {
- checkStateAndMetaStorage(n, t, null, true);
+ checkStateAndMetaStorage(n, t, null, true, false);
assertTrue(toRmv(n).containsKey(t.name()));
}
);
GridFutureAdapter<Void> afterCheckpointEndFut = checkpointLsnr.afterCheckpointEndAsync(
ctx -> {
- checkStateAndMetaStorage(n, t, null, false);
+ checkStateAndMetaStorage(n, t, null, false, false);
assertFalse(toRmv(n).containsKey(t.name()));
}
);
@@ -394,7 +438,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
t.taskFut.onDone(complete(null));
execAsyncFut.get(getTestTimeout());
- checkStateAndMetaStorage(n, t, null, false);
+ checkStateAndMetaStorage(n, t, null, false, false);
}
}
@@ -419,25 +463,60 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
* @param t Task.
* @param expState Expected state of the task, {@code null} means that the task should not be.
* @param expSaved Task is expected to be stored in MetaStorage.
+ * @param expDone Expect completion of the futures task.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void checkStateAndMetaStorage(
+ IgniteEx n,
+ DurableBackgroundTask<?> t,
+ @Nullable State expState,
+ boolean expSaved,
+ boolean expDone
+ ) throws IgniteCheckedException {
+ checkStateAndMetaStorage(n, t, expState, expSaved, expDone, null);
+ }
+
+ /**
+ * Checking the internal {@link DurableBackgroundTaskState state} of the task and storage in the MetaStorage.
+ *
+ * @param n Node.
+ * @param t Task.
+ * @param expState Expected state of the task, {@code null} means that the task should not be.
+ * @param expSaved Task is expected to be stored in MetaStorage.
+ * @param expDone Expect completion of the futures task.
+ * @param expConverted Expected value of the {@link DurableBackgroundTaskState#converted()},
+ * {@code null} if no validation is required.
* @throws IgniteCheckedException If failed.
*/
private void checkStateAndMetaStorage(
IgniteEx n,
- DurableBackgroundTask t,
+ DurableBackgroundTask<?> t,
@Nullable State expState,
- boolean expSaved
+ boolean expSaved,
+ boolean expDone,
+ @Nullable Boolean expConverted
) throws IgniteCheckedException {
- DurableBackgroundTaskState taskState = tasks(n).get(t.name());
+ DurableBackgroundTaskState<?> taskState = tasks(n).get(t.name());
if (expState == null)
assertNull(taskState);
- else
+ else {
assertEquals(expState, taskState.state());
+ assertEquals(expSaved, taskState.saved());
+ assertEquals(expDone, taskState.outFuture().isDone());
+
+ if (expConverted != null)
+ assertEquals(expConverted.booleanValue(), taskState.converted());
+ }
- DurableBackgroundTask ser = (DurableBackgroundTask)metaStorageOperation(n, ms -> ms.read(metaStorageKey(t)));
+ DurableBackgroundTask<?> ser =
+ (DurableBackgroundTask<?>)metaStorageOperation(n, ms -> ms.read(metaStorageKey(t)));
- if (expSaved)
+ if (expSaved) {
assertEquals(t.name(), ser.name());
+
+ assertTrue(t.getClass().isInstance(ser));
+ }
else
assertNull(ser);
}
@@ -450,7 +529,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
* @param save Save task to MetaStorage.
* @return Task future.
*/
- private IgniteInternalFuture<Void> execAsync(IgniteEx n, DurableBackgroundTask t, boolean save) {
+ private <R> IgniteInternalFuture<R> execAsync(IgniteEx n, DurableBackgroundTask<R> t, boolean save) {
return durableBackgroundTask(n).executeAsync(t, save);
}
@@ -460,7 +539,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
* @param n Node.
* @return Map of tasks that will be removed from the MetaStorage.
*/
- private Map<String, DurableBackgroundTask> toRmv(IgniteEx n) {
+ private Map<String, DurableBackgroundTask<?>> toRmv(IgniteEx n) {
return getFieldValue(durableBackgroundTask(n), "toRmv");
}
@@ -470,7 +549,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
* @param n Node.
* @return Task states map.
*/
- private Map<String, DurableBackgroundTaskState> tasks(IgniteEx n) {
+ private Map<String, DurableBackgroundTaskState<?>> tasks(IgniteEx n) {
return getFieldValue(durableBackgroundTask(n), "tasks");
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/SimpleTask.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/SimpleTask.java
index 8d14ea4..e59880a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/SimpleTask.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/SimpleTask.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Simple {@link DurableBackgroundTask} implementation for tests.
*/
-class SimpleTask extends IgniteDataTransferObject implements DurableBackgroundTask {
+class SimpleTask extends IgniteDataTransferObject implements DurableBackgroundTask<Void> {
/** Serial version UID. */
private static final long serialVersionUID = 0L;
@@ -42,7 +42,7 @@ class SimpleTask extends IgniteDataTransferObject implements DurableBackgroundTa
final GridFutureAdapter<Void> onExecFut = new GridFutureAdapter<>();
/** Future that will be returned from the {@link #executeAsync}. */
- final GridFutureAdapter<DurableBackgroundTaskResult> taskFut = new GridFutureAdapter<>();
+ final GridFutureAdapter<DurableBackgroundTaskResult<Void>> taskFut = new GridFutureAdapter<>();
/** Future that will be completed at the beginning of the {@link #cancel}. */
final GridFutureAdapter<Void> onCancelFut = new GridFutureAdapter<>();
@@ -73,7 +73,7 @@ class SimpleTask extends IgniteDataTransferObject implements DurableBackgroundTa
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<DurableBackgroundTaskResult> executeAsync(GridKernalContext ctx) {
+ @Override public IgniteInternalFuture<DurableBackgroundTaskResult<Void>> executeAsync(GridKernalContext ctx) {
onExecFut.onDone();
return taskFut;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractRebuildIndexTest.java
index 7b172d8..0c5f967 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractRebuildIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractRebuildIndexTest.java
@@ -31,6 +31,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cache.query.index.Index;
+import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition;
import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.BreakBuildIndexConsumer;
@@ -48,6 +50,7 @@ import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.internal.processors.cache.index.IgniteH2IndexingEx.addIdxCreateCacheRowConsumer;
import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRowConsumer;
import static org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.nodeName;
+import static org.apache.ignite.testframework.GridTestUtils.cacheContext;
import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
@@ -333,4 +336,29 @@ public abstract class AbstractRebuildIndexTest extends GridCommonAbstractTest {
return null;
}
+
+ /**
+ * Getting index description.
+ *
+ * @param idx Index.
+ * @return Index description.
+ */
+ protected SortedIndexDefinition indexDefinition(Index idx) {
+ return getFieldValue(idx, "def");
+ }
+
+ /**
+ * Getting the cache index.
+ *
+ * @param n Node.
+ * @param cache Cache.
+ * @param idxName Index name.
+ * @return Index.
+ */
+ @Nullable protected Index index(IgniteEx n, IgniteCache<Integer, Person> cache, String idxName) {
+ return n.context().indexProcessor().indexes(cacheContext(cache)).stream()
+ .filter(i -> idxName.equals(i.name()))
+ .findAny()
+ .orElse(null);
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DropIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DropIndexTest.java
new file mode 100644
index 0000000..2971aec
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DropIndexTest.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.client.Person;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cache.query.index.Index;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.InlineIndexTreeFactory;
+import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition;
+import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexTree;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
+import org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState;
+import org.apache.ignite.internal.util.function.ThrowableFunction;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.joining;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_INDEX_PAYLOAD_SIZE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.destroyIndexTrees;
+import static org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.findIndexRootPages;
+import static org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.idxTreeFactory;
+import static org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.toRootPages;
+import static org.apache.ignite.testframework.GridTestUtils.cacheContext;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ * Class for testing index drop.
+ */
+@WithSystemProperty(key = IGNITE_MAX_INDEX_PAYLOAD_SIZE, value = "1000000")
+public class DropIndexTest extends AbstractRebuildIndexTest {
+ /** Original {@link DurableBackgroundCleanupIndexTreeTaskV2#idxTreeFactory}. */
+ private InlineIndexTreeFactory originalTaskIdxTreeFactory;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ originalTaskIdxTreeFactory = idxTreeFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ idxTreeFactory = originalTaskIdxTreeFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void populate(IgniteCache<Integer, Person> cache, int cnt) {
+ String prefix = IntStream.range(0, 1_000).mapToObj(i -> "name").collect(joining("_")) + "_";
+
+ for (int i = 0; i < cnt; i++)
+ cache.put(i, new Person(i, prefix + i));
+ }
+
+ /**
+ * Checking {@link DurableBackgroundCleanupIndexTreeTaskV2#destroyIndexTrees}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDestroyIndexTrees() throws Exception {
+ checkDestroyIndexTrees(true, 3);
+ }
+
+ /**
+ * Check that the {@link DurableBackgroundCleanupIndexTreeTaskV2} will not
+ * be executed if the cache group and root pages are not found.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTaskNotExecuteIfAbsentCacheGroupOrRootPages() throws Exception {
+ IgniteEx n = startGrid(0);
+
+ String fake = UUID.randomUUID().toString();
+
+ GridCacheContext<Integer, Person> cctx = cacheContext(n.cache(DEFAULT_CACHE_NAME));
+
+ List<DurableBackgroundCleanupIndexTreeTaskV2> tasks = F.asList(
+ new DurableBackgroundCleanupIndexTreeTaskV2(fake, fake, fake, fake, fake, 10, null),
+ new DurableBackgroundCleanupIndexTreeTaskV2(cctx.group().name(), cctx.name(), fake, fake, fake, 10, null)
+ );
+
+ for (DurableBackgroundCleanupIndexTreeTaskV2 task : tasks) {
+ DurableBackgroundTaskResult<Long> res = task.executeAsync(n.context()).get(0);
+
+ assertTrue(res.completed());
+ assertNull(res.error());
+ assertNull(res.result());
+ }
+ }
+
+ /**
+ * Checking that the {@link DurableBackgroundCleanupIndexTreeTaskV2} will work correctly.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCorrectTaskExecute() throws Exception {
+ IgniteEx n = startGrid(0);
+
+ IgniteCache<Integer, Person> cache = n.cache(DEFAULT_CACHE_NAME);
+
+ populate(cache, 100);
+
+ String idxName = "IDX0";
+ createIdx(cache, idxName);
+
+ GridCacheContext<Integer, Person> cctx = cacheContext(cache);
+
+ Index idx = index(n, cache, idxName);
+
+ SortedIndexDefinition idxDef = indexDefinition(idx);
+ InlineIndexTree[] trees = segments(idx);
+
+ Map<Integer, RootPage> rootPages = toRootPages(trees);
+
+ for (int i = 0; i < trees.length; i++) {
+ InlineIndexTree tree = trees[i];
+
+ assertEquals(new FullPageId(tree.getMetaPageId(), tree.groupId()), rootPages.get(i).pageId());
+ }
+
+ String oldTreeName = idxDef.treeName();
+ String newTreeName = UUID.randomUUID().toString();
+ int segments = idxDef.segments();
+
+ assertFalse(findIndexRootPages(cctx.group(), cctx.name(), oldTreeName, segments).isEmpty());
+ assertTrue(findIndexRootPages(cctx.group(), cctx.name(), newTreeName, segments).isEmpty());
+
+ DurableBackgroundCleanupIndexTreeTaskV2 task = new DurableBackgroundCleanupIndexTreeTaskV2(
+ cctx.group().name(),
+ cctx.name(),
+ idxName,
+ oldTreeName,
+ newTreeName,
+ segments,
+ trees
+ );
+
+ assertTrue(task.name().startsWith(taskNamePrefix(cctx.name(), idxName)));
+ assertTrue(getFieldValue(task, "needToRen"));
+
+ GridFutureAdapter<Void> startFut = new GridFutureAdapter<>();
+ GridFutureAdapter<Void> endFut = new GridFutureAdapter<>();
+
+ idxTreeFactory = taskIndexTreeFactoryEx(startFut, endFut);
+
+ IgniteInternalFuture<DurableBackgroundTaskResult<Long>> taskFut = task.executeAsync(n.context());
+
+ startFut.get(getTestTimeout());
+
+ assertTrue(findIndexRootPages(cctx.group(), cctx.name(), oldTreeName, segments).isEmpty());
+ assertFalse(findIndexRootPages(cctx.group(), cctx.name(), newTreeName, segments).isEmpty());
+
+ endFut.onDone();
+
+ DurableBackgroundTaskResult<Long> res = taskFut.get(getTestTimeout());
+
+ assertTrue(res.completed());
+ assertNull(res.error());
+ assertTrue(res.result() >= 3);
+
+ assertTrue(findIndexRootPages(cctx.group(), cctx.name(), oldTreeName, segments).isEmpty());
+ assertTrue(findIndexRootPages(cctx.group(), cctx.name(), newTreeName, segments).isEmpty());
+
+ assertFalse(getFieldValue(task, "needToRen"));
+ }
+
+ /**
+ * Checking that the {@link DurableBackgroundCleanupIndexTreeTaskV2} will
+ * run when the index drop is called.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExecuteTaskOnDropIdx() throws Exception {
+ checkExecuteTask(true, 3L);
+ }
+
+ /**
+ * Checking that when the node is restarted, the
+ * {@link DurableBackgroundCleanupIndexTreeTaskV2} will finish correctly.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExecuteTaskOnDropIdxAfterRestart() throws Exception {
+ checkExecuteTaskAfterRestart(true, 3L, n -> {
+ // Disable auto activation.
+ n.cluster().baselineAutoAdjustEnabled(false);
+ stopGrid(0);
+
+ n = startGrid(0, cfg -> {
+ cfg.setClusterStateOnStart(INACTIVE);
+ });
+
+ return n;
+ });
+ }
+
+ /**
+ * Checking that when the node is re-activated, the
+ * {@link DurableBackgroundCleanupIndexTreeTaskV2} will finish correctly.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExecuteTaskOnDropIdxAfterReActivated() throws Exception {
+ checkExecuteTaskAfterRestart(true, 3L, n -> {
+ n.cluster().state(INACTIVE);
+
+ return n;
+ });
+ }
+
+ /**
+ * Checking that the {@link DurableBackgroundCleanupIndexTreeTaskV2} will
+ * work correctly for in-memory cache.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExecuteTaskOnDropIdxForInMemory() throws Exception {
+ checkExecuteTask(false, 2L);
+ }
+
+ /**
+ * Checking that when the node is re-activated, the
+ * {@link DurableBackgroundCleanupIndexTreeTaskV2} will finish correctly for in-memory cache.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExecuteTaskOnDropIdxAfterReActivatedForInMemory() throws Exception {
+ checkExecuteTaskAfterRestart(false, null, n -> {
+ n.cluster().state(INACTIVE);
+
+ return n;
+ });
+ }
+
+ /**
+ * Checking {@link DurableBackgroundCleanupIndexTreeTaskV2#destroyIndexTrees} for in-memory cache.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDestroyIndexTreesForInMemory() throws Exception {
+ checkDestroyIndexTrees(false, 2);
+ }
+
+ /**
+ * Checks that {@link DurableBackgroundCleanupIndexTreeTaskV2} will not be
+ * added when the cluster is deactivated for in-memory caches.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDonotAddTaskOnDeactivateForInMemory() throws Exception {
+ IgniteEx n = startGrid(0, cfg -> {
+ cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().setPersistenceEnabled(false);
+ });
+
+ n.cluster().state(ACTIVE);
+
+ IgniteCache<Integer, Person> cache = n.cache(DEFAULT_CACHE_NAME);
+
+ populate(cache, 100);
+
+ String idxName = "IDX0";
+ createIdx(cache, idxName);
+
+ n.cluster().state(INACTIVE);
+
+ assertTrue(tasks(n).isEmpty());
+ }
+
+ /**
+ * Getting index trees.
+ *
+ * @param idx Index.
+ * @return Index trees.
+ */
+ private InlineIndexTree[] segments(Index idx) {
+ return getFieldValue(idx, "segments");
+ }
+
+ /**
+ * Getting task state.
+ *
+ * @param n Node.
+ * @param taskNamePrefix Task name prefix;
+ * @return Task state.
+ */
+ @Nullable private DurableBackgroundTaskState<?> taskState(IgniteEx n, String taskNamePrefix) {
+ return tasks(n).entrySet().stream()
+ .filter(e -> e.getKey().startsWith(taskNamePrefix)).map(Map.Entry::getValue).findAny().orElse(null);
+ }
+
+ /**
+ * Getting {@code DurableBackgroundTasksProcessor#tasks}.
+ *
+ * @return Tasks.
+ */
+ private Map<String, DurableBackgroundTaskState<?>> tasks(IgniteEx n) {
+ return getFieldValue(n.context().durableBackgroundTask(), "tasks");
+ }
+
+ /**
+ * Getting {@link DurableBackgroundCleanupIndexTreeTaskV2} name prefix.
+ *
+ * @param cacheName Cache name.
+ * @param idxName Index name.
+ * @return Task prefix;
+ */
+ private String taskNamePrefix(String cacheName, String idxName) {
+ return "drop-sql-index-" + cacheName + "-" + idxName + "-";
+ }
+
+ /**
+ * Creating an extension for {@link InlineIndexTreeFactory}.
+ *
+ * @param startFut Future to indicate that the tree for the tak has begun to be created.
+ * @param endFut Future to wait for the continuation of the tree creation for the task.
+ * @return Extending the {@link InlineIndexTreeFactory}.
+ */
+ private InlineIndexTreeFactory taskIndexTreeFactoryEx(
+ GridFutureAdapter<Void> startFut,
+ GridFutureAdapter<Void> endFut
+ ) {
+ return new InlineIndexTreeFactory() {
+ /** {@inheritDoc} */
+ @Override protected InlineIndexTree create(
+ CacheGroupContext grpCtx,
+ RootPage rootPage,
+ String treeName
+ ) throws IgniteCheckedException {
+ startFut.onDone();
+
+ endFut.get(getTestTimeout());
+
+ return super.create(grpCtx, rootPage, treeName);
+ }
+ };
+ }
+
+ /**
+ * Drop of an index for the cache of{@link Person}.
+ * SQL: DROP INDEX " + idxName
+ *
+ * @param cache Cache.
+ * @param idxName Index name.
+ * @return Index creation future.
+ */
+ private List<List<?>> dropIdx(IgniteCache<Integer, Person> cache, String idxName) {
+ return cache.query(new SqlFieldsQuery("DROP INDEX " + idxName)).getAll();
+ }
+
+ /**
+ * Check that the {@link DurableBackgroundCleanupIndexTreeTaskV2} will be completed successfully.
+ *
+ * @param persistent Persistent default data region.
+ * @param expRes Expected result should not be less than which.
+ * @throws Exception If failed.
+ */
+ private void checkExecuteTask(
+ boolean persistent,
+ long expRes
+ ) throws Exception {
+ IgniteEx n = startGrid(0, cfg -> {
+ cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().setPersistenceEnabled(persistent);
+ });
+
+ n.cluster().state(ACTIVE);
+
+ IgniteCache<Integer, Person> cache = n.cache(DEFAULT_CACHE_NAME);
+
+ populate(cache, 100);
+
+ String idxName = "IDX0";
+ createIdx(cache, idxName);
+
+ GridFutureAdapter<Void> startFut = new GridFutureAdapter<>();
+ GridFutureAdapter<Void> endFut = new GridFutureAdapter<>();
+
+ idxTreeFactory = taskIndexTreeFactoryEx(startFut, endFut);
+
+ IgniteInternalFuture<List<List<?>>> dropIdxFut = runAsync(() -> dropIdx(cache, idxName));
+
+ startFut.get(getTestTimeout());
+
+ GridFutureAdapter<?> taskFut = taskState(n, taskNamePrefix(DEFAULT_CACHE_NAME, idxName)).outFuture();
+ assertFalse(taskFut.isDone());
+
+ endFut.onDone();
+
+ assertTrue((Long)taskFut.get(getTestTimeout()) >= expRes);
+
+ dropIdxFut.get(getTestTimeout());
+ }
+
+ /**
+ * Check that after restart / reactivation of the node,
+ * the {@link DurableBackgroundCleanupIndexTreeTaskV2} will be completed successfully.
+ *
+ * @param persistent Persistent default data region.
+ * @param expRes Expected result should not be less than which, or {@code null} if there should be no result.
+ * @param restartFun Node restart/reactivation function.
+ * @throws Exception If failed.
+ */
+ private void checkExecuteTaskAfterRestart(
+ boolean persistent,
+ @Nullable Long expRes,
+ ThrowableFunction<IgniteEx, IgniteEx, Exception> restartFun
+ ) throws Exception {
+ IgniteEx n = startGrid(0, cfg -> {
+ cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().setPersistenceEnabled(persistent);
+ });
+
+ n.cluster().state(ACTIVE);
+
+ IgniteCache<Integer, Person> cache = n.cache(DEFAULT_CACHE_NAME);
+
+ populate(cache, 100);
+
+ String idxName = "IDX0";
+ createIdx(cache, idxName);
+
+ GridFutureAdapter<Void> startFut = new GridFutureAdapter<>();
+ GridFutureAdapter<Void> endFut = new GridFutureAdapter<>();
+
+ idxTreeFactory = taskIndexTreeFactoryEx(startFut, endFut);
+
+ endFut.onDone(new IgniteCheckedException("Stop drop idx"));
+
+ // Removing the index will succeed, but the trees not.
+ dropIdx(cache, idxName);
+
+ String taskNamePrefix = taskNamePrefix(cacheContext(cache).name(), idxName);
+ assertFalse(taskState(n, taskNamePrefix).outFuture().isDone());
+
+ n = restartFun.apply(n);
+
+ idxTreeFactory = originalTaskIdxTreeFactory;
+
+ GridFutureAdapter<?> taskFut = taskState(n, taskNamePrefix).outFuture();
+ assertFalse(taskFut.isDone());
+
+ n.cluster().state(ACTIVE);
+
+ if (expRes == null)
+ assertNull(taskFut.get(getTestTimeout()));
+ else
+ assertTrue((Long)taskFut.get(getTestTimeout()) >= expRes);
+ }
+
+ /**
+ * Checking {@link DurableBackgroundCleanupIndexTreeTaskV2#destroyIndexTrees}.
+ *
+ * @param persistent Persistent default data region.
+ * @param expRes Expected result should not be less than which.
+ * @throws Exception If failed.
+ */
+ private void checkDestroyIndexTrees(
+ boolean persistent,
+ long expRes
+ ) throws Exception {
+ IgniteEx n = startGrid(0, cfg -> {
+ cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().setPersistenceEnabled(persistent);
+ });
+
+ n.cluster().state(ACTIVE);
+
+ IgniteCache<Integer, Person> cache = n.cache(DEFAULT_CACHE_NAME);
+
+ populate(cache, 100);
+
+ String idxName = "IDX0";
+ createIdx(cache, idxName);
+
+ GridCacheContext<Integer, Person> cctx = cacheContext(cache);
+
+ Index idx = index(n, cache, idxName);
+
+ SortedIndexDefinition idxDef = indexDefinition(idx);
+
+ String treeName = idxDef.treeName();
+ int segments = idxDef.segments();
+
+ Map<Integer, RootPage> rootPages = new HashMap<>();
+
+ if (persistent)
+ rootPages.putAll(findIndexRootPages(cctx.group(), cctx.name(), treeName, segments));
+ else
+ rootPages.putAll(toRootPages(segments(idx)));
+
+ assertFalse(rootPages.isEmpty());
+
+ // Emulating worker cancellation, let's make sure it doesn't cause problems.
+ Thread.currentThread().interrupt();
+
+ long pageCnt = 0;
+
+ for (Map.Entry<Integer, RootPage> e : rootPages.entrySet())
+ pageCnt += destroyIndexTrees(cctx.group(), e.getValue(), cctx.name(), treeName, e.getKey());
+
+ assertTrue(pageCnt >= expRes);
+ assertTrue(findIndexRootPages(cctx.group(), cctx.name(), treeName, segments).isEmpty());
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/RenameIndexTreeTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/RenameIndexTreeTest.java
index 2671048..a7dbee5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/RenameIndexTreeTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/RenameIndexTreeTest.java
@@ -20,26 +20,37 @@ package org.apache.ignite.internal.processors.cache.index;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.client.Person;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.cache.query.index.Index;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2;
import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.IndexRenameRootPageRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.junit.Test;
+import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.findIndexRootPages;
+import static org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.renameIndexRootPages;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.INDEX_ROOT_PAGE_RENAME_RECORD;
import static org.apache.ignite.internal.processors.cache.persistence.IndexStorageImpl.MAX_IDX_NAME_LEN;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.cacheContext;
-import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
/**
* Class for testing index tree renaming.
@@ -202,6 +213,69 @@ public class RenameIndexTreeTest extends AbstractRebuildIndexTest {
}
/**
+ * Checking the correctness of {@link DurableBackgroundCleanupIndexTreeTaskV2#findIndexRootPages}
+ * and {@link DurableBackgroundCleanupIndexTreeTaskV2#renameIndexRootPages}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRenameFromTask() throws Exception {
+ IgniteEx n = startGrid(0);
+
+ IgniteCache<Integer, Person> cache = n.cache(DEFAULT_CACHE_NAME);
+
+ populate(cache, 100);
+
+ String idxName = "IDX0";
+ createIdx(cache, idxName);
+
+ SortedIndexDefinition idxDef = indexDefinition(index(n, cache, idxName));
+
+ GridCacheContext<Integer, Person> cctx = cacheContext(cache);
+
+ String oldTreeName = idxDef.treeName();
+ int segments = idxDef.segments();
+
+ assertExistIndexRoot(cache, oldTreeName, segments, true);
+
+ Map<Integer, RootPage> rootPages0 = findIndexRoots(cache, oldTreeName, segments);
+ Map<Integer, RootPage> rootPages1 = findIndexRootPages(cctx.group(), cctx.name(), oldTreeName, segments);
+
+ assertEqualsCollections(toPageIds(rootPages0), toPageIds(rootPages1));
+
+ long currSegIdx = walMgr(n).currentSegment();
+
+ String newTreeName = UUID.randomUUID().toString();
+ renameIndexRootPages(cctx.group(), cctx.name(), oldTreeName, newTreeName, segments);
+
+ assertExistIndexRoot(cache, oldTreeName, segments, false);
+ assertExistIndexRoot(cache, newTreeName, segments, true);
+
+ assertTrue(findIndexRootPages(cctx.group(), cctx.name(), oldTreeName, segments).isEmpty());
+
+ rootPages0 = findIndexRoots(cache, newTreeName, segments);
+ rootPages1 = findIndexRootPages(cctx.group(), cctx.name(), newTreeName, segments);
+
+ assertEqualsCollections(toPageIds(rootPages0), toPageIds(rootPages1));
+
+ WALPointer start = new WALPointer(currSegIdx, 0, 0);
+ IgniteBiPredicate<WALRecord.RecordType, WALPointer> pred = (t, p) -> t == INDEX_ROOT_PAGE_RENAME_RECORD;
+
+ try (WALIterator it = walMgr(n).replay(start, pred)) {
+ List<WALRecord> records = stream(it.spliterator(), false).map(IgniteBiTuple::get2).collect(toList());
+
+ assertEquals(1, records.size());
+
+ IndexRenameRootPageRecord record = (IndexRenameRootPageRecord)records.get(0);
+
+ assertEquals(cctx.cacheId(), record.cacheId());
+ assertEquals(oldTreeName, record.oldTreeName());
+ assertEquals(newTreeName, record.newTreeName());
+ assertEquals(segments, record.segments());
+ }
+ }
+
+ /**
* Renaming index trees.
*
* @param cache Cache.
@@ -258,37 +332,48 @@ public class RenameIndexTreeTest extends AbstractRebuildIndexTest {
int segments,
boolean expExist
) throws Exception {
+ Map<Integer, RootPage> rootPages = findIndexRoots(cache, treeName, segments);
+
+ for (int i = 0; i < segments; i++)
+ assertEquals(expExist, rootPages.containsKey(i));
+ }
+
+ /**
+ * Finding index root pages.
+ *
+ * @param cache Cache.
+ * @param treeName Index tree name.
+ * @param segments Segment count.
+ * @return Mapping: segment number -> index root page.
+ * @throws Exception If failed.
+ */
+ private Map<Integer, RootPage> findIndexRoots(
+ IgniteCache<Integer, Person> cache,
+ String treeName,
+ int segments
+ ) throws Exception {
GridCacheContext<Integer, Person> cacheCtx = cacheContext(cache);
+ Map<Integer, RootPage> rootPages = new HashMap<>();
+
for (int i = 0; i < segments; i++) {
RootPage rootPage = cacheCtx.offheap().findRootPageForIndex(cacheCtx.cacheId(), treeName, i);
- assertEquals(expExist, rootPage != null);
+ if (rootPage != null)
+ rootPages.put(i, rootPage);
}
- }
- /**
- * Getting index description.
- *
- * @param idx Index.
- * @return Index description.
- */
- private SortedIndexDefinition indexDefinition(Index idx) {
- return getFieldValue(idx, "def");
+ return rootPages;
}
/**
- * Getting the cache index.
+ * Getting page ids.
*
- * @param n Node.
- * @param cache Cache.
- * @param idxName Index name.
- * @return Index.
+ * @param rootPages Mapping: segment number -> index root page.
+ * @return Page ids sorted by segment numbers.
*/
- @Nullable private Index index(IgniteEx n, IgniteCache<Integer, Person> cache, String idxName) {
- return n.context().indexProcessor().indexes(cacheContext(cache)).stream()
- .filter(i -> idxName.equals(i.name()))
- .findAny()
- .orElse(null);
+ private Collection<FullPageId> toPageIds(Map<Integer, RootPage> rootPages) {
+ return rootPages.entrySet().stream()
+ .sorted(Map.Entry.comparingByKey()).map(e -> e.getValue().pageId()).collect(toList());
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
index b24ddee..abff7e3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
@@ -49,17 +49,21 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cache.query.index.IndexName;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTask;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.InlineIndexTreeFactory;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.NoopRowHandlerFactory;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRowCache;
import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandlerFactory;
import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition;
-import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexFactory;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexTree;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineRecommender;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
@@ -71,8 +75,8 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendi
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIoResolver;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.LongListReuseBag;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.internal.visor.verify.ValidateIndexesPartitionResult;
import org.apache.ignite.internal.visor.verify.VisorValidateIndexesJobResult;
@@ -89,8 +93,14 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteThread;
import org.junit.Test;
+import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.idxTreeFactory;
+import static org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
/**
* Tests case when long index deletion operation happens.
@@ -124,20 +134,24 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
private CountDownLatch idxsRebuildLatch;
/** */
- private final LogListener pendingDelFinishedLsnr =
- new CallbackExecutorLogListener(".*?Execution of durable background task completed.*", () -> pendingDelLatch.countDown());
+ private final LogListener pendingDelFinishedLsnr = new CallbackExecutorLogListener(
+ ".*?Execution of durable background task completed.*",
+ () -> pendingDelLatch.countDown()
+ );
/** */
- private final LogListener idxsRebuildFinishedLsnr =
- new CallbackExecutorLogListener("Indexes rebuilding completed for all caches.", () -> idxsRebuildLatch.countDown());
+ private final LogListener idxsRebuildFinishedLsnr = new CallbackExecutorLogListener(
+ "Indexes rebuilding completed for all caches.",
+ () -> idxsRebuildLatch.countDown()
+ );
/** */
- private final LogListener taskLifecycleListener =
+ private final LogListener taskLifecycleLsnr =
new MessageOrderLogListener(
- ".*?Executing durable background task: DROP_SQL_INDEX-PUBLIC." + IDX_NAME + "-.*",
- ".*?Could not execute durable background task: DROP_SQL_INDEX-PUBLIC." + IDX_NAME + "-.*",
- ".*?Executing durable background task: DROP_SQL_INDEX-PUBLIC." + IDX_NAME + "-.*",
- ".*?Execution of durable background task completed: DROP_SQL_INDEX-PUBLIC." + IDX_NAME + "-.*"
+ ".*?Executing durable background task: drop-sql-index-SQL_PUBLIC_T-" + IDX_NAME + "-.*",
+ ".*?Could not execute durable background task: drop-sql-index-SQL_PUBLIC_T-" + IDX_NAME + "-.*",
+ ".*?Executing durable background task: drop-sql-index-SQL_PUBLIC_T-" + IDX_NAME + "-.*",
+ ".*?Execution of durable background task completed: drop-sql-index-SQL_PUBLIC_T-" + IDX_NAME + "-.*"
);
/**
@@ -153,15 +167,15 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
blockedSysCriticalThreadLsnr,
pendingDelFinishedLsnr,
idxsRebuildFinishedLsnr,
- taskLifecycleListener
+ taskLifecycleLsnr
);
/** */
- private InlineIndexFactory regularIdxFactory;
-
- /** */
private DurableBackgroundTaskTestListener durableBackgroundTaskTestLsnr;
+ /** Original {@link DurableBackgroundCleanupIndexTreeTaskV2#idxTreeFactory}. */
+ private InlineIndexTreeFactory originalFactory;
+
/** */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
@@ -184,9 +198,9 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
.setCacheConfiguration(
new CacheConfiguration(DEFAULT_CACHE_NAME)
.setBackups(1)
- .setSqlSchema("PUBLIC"),
+ .setSqlSchema(DFLT_SCHEMA),
new CacheConfiguration<Integer, Integer>("TEST")
- .setSqlSchema("PUBLIC")
+ .setSqlSchema(DFLT_SCHEMA)
.setBackups(1)
.setDataRegionName("dr1")
)
@@ -199,28 +213,28 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
cleanPersistenceDir();
- regularIdxFactory = IgniteH2Indexing.idxFactory;
-
- IgniteH2Indexing.idxFactory = new InlineIndexFactoryTest();
-
blockedSysCriticalThreadLsnr.reset();
pendingDelLatch = new CountDownLatch(1);
idxsRebuildLatch = new CountDownLatch(1);
+
+ originalFactory = idxTreeFactory;
+ idxTreeFactory = new InlineIndexTreeFactoryEx();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
blockedSysCriticalThreadLsnr.reset();
- IgniteH2Indexing.idxFactory = regularIdxFactory;
-
stopAllGrids();
cleanPersistenceDir();
durableBackgroundTaskTestLsnr = null;
+ idxTreeFactory = originalFactory;
+ originalFactory = null;
+
super.afterTest();
}
@@ -525,7 +539,7 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
.addCheckpointListener(durableBackgroundTaskTestLsnr);
}
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
ignite.cluster().baselineAutoAdjustEnabled(false);
@@ -673,7 +687,7 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
*/
@Test
public void testDestroyTaskLifecycle() throws Exception {
- taskLifecycleListener.reset();
+ taskLifecycleLsnr.reset();
IgniteEx ignite = prepareAndPopulateCluster(1, false, false);
@@ -681,9 +695,9 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
checkSelectAndPlan(cache, false);
- ignite.cluster().active(false);
+ ignite.cluster().state(INACTIVE);
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
ignite.cache(DEFAULT_CACHE_NAME).indexReadyFuture().get();
@@ -697,7 +711,7 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
awaitLatch(pendingDelLatch, "Test timed out: failed to await for durable background task completion.");
- assertTrue(taskLifecycleListener.check());
+ assertTrue(taskLifecycleLsnr.check());
}
/**
@@ -709,13 +723,15 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
public void testRemoveIndexesOnTableDrop() throws Exception {
IgniteEx ignite = startGrids(1);
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME);
- query(cache, "create table t1 (id integer primary key, p integer, f integer) with \"BACKUPS=1, CACHE_GROUP=grp_test_table\"");
+ query(cache, "create table t1 (id integer primary key, p integer, f integer) " +
+ "with \"BACKUPS=1, CACHE_GROUP=grp_test_table\"");
- query(cache, "create table t2 (id integer primary key, p integer, f integer) with \"BACKUPS=1, CACHE_GROUP=grp_test_table\"");
+ query(cache, "create table t2 (id integer primary key, p integer, f integer) " +
+ "with \"BACKUPS=1, CACHE_GROUP=grp_test_table\"");
query(cache, "create index t2_idx on t2 (p)");
@@ -723,11 +739,11 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
forceCheckpoint();
- CountDownLatch inxDeleteInAsyncTaskLatch = new CountDownLatch(1);
+ CountDownLatch inxDelInAsyncTaskLatch = new CountDownLatch(1);
LogListener lsnr = new CallbackExecutorLogListener(
- ".*?Execution of durable background task completed: DROP_SQL_INDEX-PUBLIC.T2_IDX-.*",
- () -> inxDeleteInAsyncTaskLatch.countDown()
+ ".*?Execution of durable background task completed: drop-sql-index-SQL_PUBLIC_T2-T2_IDX-.*",
+ inxDelInAsyncTaskLatch::countDown
);
testLog.registerListener(lsnr);
@@ -735,8 +751,9 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
ignite.destroyCache("SQL_PUBLIC_T2");
awaitLatch(
- inxDeleteInAsyncTaskLatch,
- "Failed to await for index deletion in async task (either index failed to delete in 1 minute or async task not started)"
+ inxDelInAsyncTaskLatch,
+ "Failed to await for index deletion in async task " +
+ "(either index failed to delete in 1 minute or async task not started)"
);
}
@@ -758,28 +775,36 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
assertTrue(durableBackgroundTaskTestLsnr.check());
}
- /** */
- private class InlineIndexFactoryTest extends InlineIndexFactory {
- /** */
- @Override protected InlineIndexTree createIndexSegment(GridCacheContext<?, ?> cctx, SortedIndexDefinition def,
- RootPage rootPage, IoStatisticsHolder stats, InlineRecommender recommender, int segmentNum) throws Exception {
- return new InlineIndexTreeTest(
- def,
- cctx,
- def.treeName(),
- cctx.offheap(),
- cctx.offheap().reuseListForIndex(def.treeName()),
- cctx.dataRegion().pageMemory(),
- PageIoResolver.DEFAULT_PAGE_IO_RESOLVER,
- rootPage.pageId().pageId(),
- rootPage.isAllocated(),
- def.inlineSize(),
- def.keyTypeSettings(),
- def.idxRowCache(),
- stats,
- def.rowHandlerFactory(),
- recommender);
- }
+ /**
+ * Checking the converting of the old problem into the new one.
+ */
+ @Test
+ public void testConvertOldTaskToNew() {
+ String grpName = "grpTest";
+ String cacheName = "cacheTest";
+ String treeName = "treeTest";
+ String idxName = "idxTest";
+
+ List<Long> pages = F.asList(100L);
+
+ DurableBackgroundCleanupIndexTreeTask oldTask = new DurableBackgroundCleanupIndexTreeTask(
+ pages,
+ emptyList(),
+ grpName,
+ cacheName,
+ new IndexName(cacheName, "schemaTest", "tableTest", idxName),
+ treeName
+ );
+
+ DurableBackgroundTask convertedTask = oldTask.convertAfterRestoreIfNeeded();
+ assertTrue(convertedTask instanceof DurableBackgroundCleanupIndexTreeTaskV2);
+
+ assertEquals(grpName, getFieldValue(convertedTask, "grpName"));
+ assertEquals(cacheName, getFieldValue(convertedTask, "cacheName"));
+ assertEquals(treeName, getFieldValue(convertedTask, "oldTreeName"));
+ assertNotNull(getFieldValue(convertedTask, "newTreeName"));
+ assertEquals(idxName, getFieldValue(convertedTask, "idxName"));
+ assertEquals(pages.size(), (int)getFieldValue(convertedTask, "segments"));
}
/** */
@@ -787,7 +812,7 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
/** */
public InlineIndexTreeTest(
SortedIndexDefinition def,
- GridCacheContext<?, ?> cctx,
+ CacheGroupContext grpCtx,
String treeName,
IgniteCacheOffheapManager offheap,
ReuseList reuseList,
@@ -796,6 +821,7 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
long metaPageId,
boolean initNew,
int configuredInlineSize,
+ int maxInlineSize,
IndexKeyTypeSettings keyTypeSettings,
IndexRowCache rowCache,
IoStatisticsHolder stats,
@@ -804,7 +830,7 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
) throws IgniteCheckedException {
super(
def,
- cctx,
+ grpCtx,
treeName,
offheap,
reuseList,
@@ -813,6 +839,7 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
metaPageId,
initNew,
configuredInlineSize,
+ maxInlineSize,
keyTypeSettings,
rowCache,
stats,
@@ -908,4 +935,35 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
/* No op. */
}
}
+
+ /**
+ * Extension {@link InlineIndexTreeFactory} for test.
+ */
+ private class InlineIndexTreeFactoryEx extends InlineIndexTreeFactory {
+ /** {@inheritDoc} */
+ @Override protected InlineIndexTree create(
+ CacheGroupContext grpCtx,
+ RootPage rootPage,
+ String treeName
+ ) throws IgniteCheckedException {
+ return new InlineIndexTreeTest(
+ null,
+ grpCtx,
+ treeName,
+ grpCtx.offheap(),
+ grpCtx.offheap().reuseListForIndex(treeName),
+ grpCtx.dataRegion().pageMemory(),
+ PageIoResolver.DEFAULT_PAGE_IO_RESOLVER,
+ rootPage.pageId().pageId(),
+ false,
+ 0,
+ 0,
+ new IndexKeyTypeSettings(),
+ null,
+ null,
+ new NoopRowHandlerFactory(),
+ null
+ );
+ }
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/MultipleParallelCacheDeleteDeadlockTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/MultipleParallelCacheDeleteDeadlockTest.java
index 8f6ccbf..9adf25f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/MultipleParallelCacheDeleteDeadlockTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/MultipleParallelCacheDeleteDeadlockTest.java
@@ -30,39 +30,42 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.InlineIndexTreeFactory;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRowCache;
import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandlerFactory;
import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition;
-import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexFactory;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexTree;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineRecommender;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIoResolver;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.LongListReuseBag;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static java.util.Arrays.asList;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.idxTreeFactory;
+import static org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA;
/**
*
*/
public class MultipleParallelCacheDeleteDeadlockTest extends GridCommonAbstractTest {
/** Latch that blocks test completion. */
- private CountDownLatch testCompletionBlockingLatch = new CountDownLatch(1);
+ private final CountDownLatch testCompletionBlockingLatch = new CountDownLatch(1);
/** Latch that blocks checkpoint. */
- private CountDownLatch checkpointBlockingLatch = new CountDownLatch(1);
+ private final CountDownLatch checkpointBlockingLatch = new CountDownLatch(1);
/** We imitate long index destroy in these tests, so this is delay for each page to destroy. */
private static final long TIME_FOR_EACH_INDEX_PAGE_TO_DESTROY = 300;
@@ -79,8 +82,8 @@ public class MultipleParallelCacheDeleteDeadlockTest extends GridCommonAbstractT
/** */
private static final String CACHE_GRP_2 = "cache_grp_2";
- /** */
- private InlineIndexFactory regularH2TreeFactory;
+ /** Original {@link DurableBackgroundCleanupIndexTreeTaskV2#idxTreeFactory}. */
+ private InlineIndexTreeFactory originalFactory;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -98,10 +101,10 @@ public class MultipleParallelCacheDeleteDeadlockTest extends GridCommonAbstractT
.setCacheConfiguration(
new CacheConfiguration(CACHE_1)
.setGroupName(CACHE_GRP_1)
- .setSqlSchema("PUBLIC"),
+ .setSqlSchema(DFLT_SCHEMA),
new CacheConfiguration(CACHE_2)
.setGroupName(CACHE_GRP_2)
- .setSqlSchema("PUBLIC")
+ .setSqlSchema(DFLT_SCHEMA)
);
}
@@ -111,19 +114,19 @@ public class MultipleParallelCacheDeleteDeadlockTest extends GridCommonAbstractT
cleanPersistenceDir();
- regularH2TreeFactory = IgniteH2Indexing.idxFactory;
-
- IgniteH2Indexing.idxFactory = new InlineIndexFactoryTest();
+ originalFactory = idxTreeFactory;
+ idxTreeFactory = new InlineIndexTreeFactoryEx();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
- IgniteH2Indexing.idxFactory = regularH2TreeFactory;
-
stopAllGrids();
cleanPersistenceDir();
+ idxTreeFactory = originalFactory;
+ originalFactory = null;
+
super.afterTest();
}
@@ -132,7 +135,7 @@ public class MultipleParallelCacheDeleteDeadlockTest extends GridCommonAbstractT
public void testMultipleCacheDelete() throws Exception {
IgniteEx ignite = startGrids(1);
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
IgniteCache cache1 = ignite.getOrCreateCache(CACHE_1);
IgniteCache cache2 = ignite.getOrCreateCache(CACHE_2);
@@ -207,30 +210,6 @@ public class MultipleParallelCacheDeleteDeadlockTest extends GridCommonAbstractT
return cache.query(new SqlFieldsQuery(qry).setArgs(args)).getAll();
}
- /** */
- private class InlineIndexFactoryTest extends InlineIndexFactory {
- /** */
- @Override protected InlineIndexTree createIndexSegment(GridCacheContext<?, ?> cctx, SortedIndexDefinition def,
- RootPage rootPage, IoStatisticsHolder stats, InlineRecommender recommender, int segmentNum) throws Exception {
- return new InlineIndexTreeTest(
- def,
- cctx,
- def.treeName(),
- cctx.offheap(),
- cctx.offheap().reuseListForIndex(def.treeName()),
- cctx.dataRegion().pageMemory(),
- PageIoResolver.DEFAULT_PAGE_IO_RESOLVER,
- rootPage.pageId().pageId(),
- rootPage.isAllocated(),
- def.inlineSize(),
- def.keyTypeSettings(),
- def.idxRowCache(),
- stats,
- def.rowHandlerFactory(),
- recommender);
- }
- }
-
/**
* Test Inline index tree.
*/
@@ -238,7 +217,7 @@ public class MultipleParallelCacheDeleteDeadlockTest extends GridCommonAbstractT
/** */
public InlineIndexTreeTest(
SortedIndexDefinition def,
- GridCacheContext<?, ?> cctx,
+ CacheGroupContext grpCtx,
String treeName,
IgniteCacheOffheapManager offheap,
ReuseList reuseList,
@@ -247,6 +226,7 @@ public class MultipleParallelCacheDeleteDeadlockTest extends GridCommonAbstractT
long metaPageId,
boolean initNew,
int configuredInlineSize,
+ int maxInlineSize,
IndexKeyTypeSettings keyTypeSettings,
IndexRowCache rowCache,
IoStatisticsHolder stats,
@@ -255,7 +235,7 @@ public class MultipleParallelCacheDeleteDeadlockTest extends GridCommonAbstractT
) throws IgniteCheckedException {
super(
def,
- cctx,
+ grpCtx,
treeName,
offheap,
reuseList,
@@ -264,6 +244,7 @@ public class MultipleParallelCacheDeleteDeadlockTest extends GridCommonAbstractT
metaPageId,
initNew,
configuredInlineSize,
+ maxInlineSize,
keyTypeSettings,
rowCache,
stats,
@@ -299,4 +280,35 @@ public class MultipleParallelCacheDeleteDeadlockTest extends GridCommonAbstractT
return 10;
}
}
+
+ /**
+ * Extension {@link InlineIndexTreeFactory} for test.
+ */
+ private class InlineIndexTreeFactoryEx extends InlineIndexTreeFactory {
+ /** {@inheritDoc} */
+ @Override protected InlineIndexTree create(
+ CacheGroupContext grpCtx,
+ RootPage rootPage,
+ String treeName
+ ) throws IgniteCheckedException {
+ return new InlineIndexTreeTest(
+ null,
+ grpCtx,
+ treeName,
+ grpCtx.offheap(),
+ grpCtx.offheap().reuseListForIndex(treeName),
+ grpCtx.dataRegion().pageMemory(),
+ PageIoResolver.DEFAULT_PAGE_IO_RESOLVER,
+ rootPage.pageId().pageId(),
+ false,
+ 0,
+ 0,
+ new IndexKeyTypeSettings(),
+ null,
+ null,
+ new DurableBackgroundCleanupIndexTreeTaskV2.NoopRowHandlerFactory(),
+ null
+ );
+ }
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 1dcfb60..3c4e311 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.encryption.CacheGroupReencryptionTest;
import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingAndGroupPutGetPersistenceSelfTest;
import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest;
import org.apache.ignite.internal.processors.cache.index.ClientReconnectWithSqlTableConfiguredTest;
+import org.apache.ignite.internal.processors.cache.index.DropIndexTest;
import org.apache.ignite.internal.processors.cache.index.ForceRebuildIndexTest;
import org.apache.ignite.internal.processors.cache.index.RenameIndexTreeTest;
import org.apache.ignite.internal.processors.cache.index.ResumeCreateIndexTest;
@@ -74,7 +75,8 @@ import org.junit.runners.Suite;
IgniteClusterSnapshotRestoreWithIndexingTest.class,
ResumeRebuildIndexTest.class,
ResumeCreateIndexTest.class,
- RenameIndexTreeTest.class
+ RenameIndexTreeTest.class,
+ DropIndexTest.class
})
public class IgnitePdsWithIndexingTestSuite {
}