You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/04/30 08:07:29 UTC
[ignite] branch master updated: IGNITE-14625 DurableBackgroundTask
threads management improved;
fixed issue when checkpointer could preemptively delete completed task,
potentially causing leaks. (#9037)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 598e97b IGNITE-14625 DurableBackgroundTask threads management improved; fixed issue when checkpointer could preemptively delete completed task, potentially causing leaks. (#9037)
598e97b is described below
commit 598e97bb876a5aa0394e83cd67622760f7d2507b
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Apr 30 11:07:11 2021 +0300
IGNITE-14625 DurableBackgroundTask threads management improved; fixed issue when checkpointer could preemptively delete completed task, potentially causing leaks. (#9037)
---
.../DurableBackgroundCleanupIndexTreeTask.java | 87 +++-
.../query/index/sorted/inline/InlineIndexImpl.java | 2 +-
.../pendingtask/DurableBackgroundTask.java | 37 +-
.../pendingtask/DurableBackgroundTaskResult.java | 107 +++++
.../cluster/GridClusterStateProcessor.java | 2 +-
.../localtask/DurableBackgroundTaskState.java | 147 +++++++
.../localtask/DurableBackgroundTasksProcessor.java | 436 +++++++++++----------
.../apache/ignite/internal/util/IgniteUtils.java | 9 +-
.../IgniteDataStorageMetricsSelfTest.java | 10 -
.../persistence/wal/WalArchiveConsistencyTest.java | 11 -
.../DurableBackgroundTasksProcessorSelfTest.java | 401 +++++++++++++++++++
.../localtask/ObservingCheckpointListener.java | 120 ++++++
.../internal/processors/localtask/SimpleTask.java | 102 +++++
.../junits/common/GridCommonAbstractTest.java | 10 +
.../ignite/testsuites/IgnitePdsTestSuite.java | 3 +
15 files changed, 1212 insertions(+), 272 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
index 4e23c11..00cb35e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
@@ -22,7 +22,9 @@ import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cache.query.index.IndexName;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyType;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexTree;
@@ -36,11 +38,18 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIoResolver;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.singleton;
import static org.apache.ignite.internal.metric.IoStatisticsType.SORTED_INDEX;
/**
@@ -54,10 +63,7 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT
private List<Long> rootPages;
/** */
- private transient List<InlineIndexTree> trees;
-
- /** */
- private transient volatile boolean completed;
+ private transient volatile List<InlineIndexTree> trees;
/** */
private String cacheGrpName;
@@ -77,6 +83,12 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT
/** */
private final String id;
+ /** Logger. */
+ @Nullable private transient volatile IgniteLogger log;
+
+ /** Worker tasks. */
+ @Nullable private transient volatile GridWorker worker;
+
/** */
public DurableBackgroundCleanupIndexTreeTask(
List<Long> rootPages,
@@ -88,7 +100,6 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT
) {
this.rootPages = rootPages;
this.trees = trees;
- this.completed = false;
this.cacheGrpName = cacheGrpName;
this.cacheName = cacheName;
this.id = UUID.randomUUID().toString();
@@ -98,12 +109,51 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT
}
/** {@inheritDoc} */
- @Override public String shortName() {
+ @Override public String name() {
return "DROP_SQL_INDEX-" + schemaName + "." + idxName + "-" + id;
}
/** {@inheritDoc} */
- @Override public void execute(GridKernalContext ctx) {
+ @Override public IgniteInternalFuture<DurableBackgroundTaskResult> executeAsync(GridKernalContext ctx) {
+ log = ctx.log(this.getClass());
+
+ assert worker == null;
+
+ GridFutureAdapter<DurableBackgroundTaskResult> fut = new GridFutureAdapter<>();
+
+ worker = new GridWorker(
+ ctx.igniteInstanceName(),
+ "async-durable-background-task-executor-" + name(),
+ log
+ ) {
+ /** {@inheritDoc} */
+ @Override protected void body() {
+ try {
+ execute(ctx);
+
+ worker = null;
+
+ fut.onDone(DurableBackgroundTaskResult.complete(null));
+ }
+ catch (Throwable t) {
+ worker = null;
+
+ fut.onDone(DurableBackgroundTaskResult.restart(t));
+ }
+ }
+ };
+
+ new IgniteThread(worker).start();
+
+ return fut;
+ }
+
+ /**
+ * Task execution.
+ *
+ * @param ctx Kernal context.
+ */
+ private void execute(GridKernalContext ctx) {
List<InlineIndexTree> trees0 = trees;
if (trees0 == null) {
@@ -173,7 +223,7 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT
// Below we create a fake index tree using it's root page, stubbing some parameters,
// because we just going to free memory pages that are occupied by tree structure.
try {
- String treeName = "deletedTree_" + i + "_" + shortName();
+ String treeName = "deletedTree_" + i + "_" + name();
InlineIndexTree tree = new InlineIndexTree(
null, cctx, treeName, cctx.offheap(), cctx.offheap().reuseListForIndex(treeName),
@@ -226,7 +276,8 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT
return pageAddr == 0;
}
finally {
- pageMem.readUnlock(grpId, rootPageId, page);
+ if (pageAddr != 0)
+ pageMem.readUnlock(grpId, rootPageId, page);
}
}
finally {
@@ -239,18 +290,16 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT
}
/** {@inheritDoc} */
- @Override public void complete() {
- completed = true;
- }
+ @Override public void cancel() {
+ trees = null;
- /** {@inheritDoc} */
- @Override public boolean isCompleted() {
- return completed;
- }
+ GridWorker w = worker;
- /** {@inheritDoc} */
- @Override public void onCancel() {
- trees = null;
+ if (w != null) {
+ worker = null;
+
+ U.awaitForWorkersStop(singleton(w), true, log);
+ }
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
index 9dd6022..841ef0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
@@ -449,7 +449,7 @@ public class InlineIndexImpl extends AbstractIndex implements InlineIndex {
treeName
);
- cctx.kernalContext().durableBackgroundTasksProcessor().startDurableBackgroundTask(task, cctx.config());
+ cctx.kernalContext().durableBackgroundTasksProcessor().executeAsync(task, cctx.config());
}
}
catch (IgniteCheckedException e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
index 355f624..743f84c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
@@ -18,41 +18,32 @@ package org.apache.ignite.internal.processors.cache.persistence.metastorage.pend
import java.io.Serializable;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
/**
- * Durable task that should be used to do long operations (e.g. index deletion) in background
- * for cases when node with persistence can fail before operation is completed. After start, node reads it's
- * pending background tasks from metastorage and completes them.
+ * Durable task that should be used to do long operations (e.g. index deletion) in background.
*/
public interface DurableBackgroundTask extends Serializable {
/**
- * Short unique name of the task is used to build metastorage key for saving this task and for logging.
+ * Getting the name of the task to identify it.
+ * Also used as part of a key for storage in a MetaStorage.
*
- * @return Short name of this task.
+ * @return Task name.
*/
- public String shortName();
+ String name();
/**
- * Method that executes the task. It is called after node start.
- *
- * @param ctx Grid kernal context.
- */
- public void execute(GridKernalContext ctx);
-
- /**
- * Method that marks task as complete.
+ * Canceling the task.
*/
- public void complete();
+ void cancel();
/**
- * Method that return completion flag.
+ * Asynchronous task execution.
*
- * @return flag that task completed.
- */
- public boolean isCompleted();
-
- /**
- * Callback for task cancellation.
+ * Completion of the task execution should be only with the {@link DurableBackgroundTaskResult result}.
+ *
+ * @param ctx Kernal context.
+ * @return Future of the tasks.
*/
- public void onCancel();
+ IgniteInternalFuture<DurableBackgroundTaskResult> executeAsync(GridKernalContext ctx);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTaskResult.java
new file mode 100644
index 0000000..5d82e60
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTaskResult.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of executing a durable background task.
+ * <p/>
+ * There may be the following states:
+ * <ul>
+ * <li>{@link #completed Completed} - the task has completed its execution and should be deleted.</li>
+ * <li>{@link #restart Restart} - the task has not yet completed its execution and must be restarted.</li>
+ * </ul>
+ */
+public class DurableBackgroundTaskResult {
+ /** Completed state. */
+ private static final Object COMPLETED = new Object();
+
+ /** Restarted state. */
+ private static final Object RESTART = new Object();
+
+ /** Execution state. */
+ private final Object state;
+
+ /** An error occurred while executing the task. */
+ @Nullable private final Throwable err;
+
+ /**
+ * Constructor.
+ *
+ * @param res Execution state.
+ * @param err An error occurred while executing the task.
+ */
+ private DurableBackgroundTaskResult(Object res, @Nullable Throwable err) {
+ this.state = res;
+ this.err = err;
+ }
+
+ /**
+ * Creation of a completed task execution result that does not require restarting it.
+ *
+ * @param err An error occurred while executing the task.
+ * @return Result of executing a durable background task.
+ */
+ public static DurableBackgroundTaskResult complete(@Nullable Throwable err) {
+ return new DurableBackgroundTaskResult(COMPLETED, err);
+ }
+
+ /**
+ * Creation of a task execution result that requires its restart.
+ *
+ * @param err An error occurred while executing the task.
+ * @return Result of executing a durable background task.
+ */
+ public static DurableBackgroundTaskResult restart(@Nullable Throwable err) {
+ return new DurableBackgroundTaskResult(RESTART, err);
+ }
+
+ /**
+ * Checking the completion of the task.
+ *
+ * @return {@code True} if completed.
+ */
+ public boolean completed() {
+ return state == COMPLETED;
+ }
+
+ /**
+ * Checking if the task needs to be restarted.
+ *
+ * @return {@code True} if the task needs to be restarted.
+ */
+ public boolean restart() {
+ return state == RESTART;
+ }
+
+ /**
+ * Getting a task execution error.
+ *
+ * @return An error occurred while executing the task.
+ */
+ @Nullable public Throwable error() {
+ return err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DurableBackgroundTaskResult.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 73afdf9..a3d1fdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -729,7 +729,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
nodeIds
);
- ctx.durableBackgroundTasksProcessor().onStateChange(msg);
+ ctx.durableBackgroundTasksProcessor().onStateChangeStarted(msg);
if (msg.forceChangeBaselineTopology())
newState.setTransitionResult(msg.requestId(), msg.state());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTaskState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTaskState.java
new file mode 100644
index 0000000..32ab7b9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTaskState.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.localtask;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+
+/**
+ * Class for storing the current state of a durable background task.
+ *
+ * Task execution state transitions:
+ * INIT -> PREPARE -> STARTED -> COMPLETED
+ *
+ * If the task needs to be restarted, it must have status INIT.
+ */
+public class DurableBackgroundTaskState {
+ /**
+ * Enumeration of the current state of the task.
+ */
+ public enum State {
+ /** Initial state. */
+ INIT,
+
+ /** Preparation for execution state. */
+ PREPARE,
+
+ /** Execution state. */
+ STARTED,
+
+ /** Completion state. */
+ COMPLETED
+ }
+
+ /** Current state atomic updater. */
+ private static final AtomicReferenceFieldUpdater<DurableBackgroundTaskState, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DurableBackgroundTaskState.class, State.class, "state");
+
+ /** Durable background task. */
+ private final DurableBackgroundTask task;
+
+ /** Outside task future. */
+ @Nullable private final GridFutureAdapter<Void> outFut;
+
+ /** Task has been saved to the MetaStorage. */
+ private final boolean saved;
+
+ /** Current state of the task. */
+ private volatile State state = INIT;
+
+ /**
+ * Constructor.
+ *
+ * @param task Durable background task.
+ * @param outFut Outside task future.
+ * @param saved Task has been saved to the MetaStorage.
+ */
+ public DurableBackgroundTaskState(
+ DurableBackgroundTask task,
+ @Nullable GridFutureAdapter<Void> outFut,
+ boolean saved
+ ) {
+ this.task = task;
+ this.outFut = outFut;
+ this.saved = saved;
+ }
+
+ /**
+ * Getting durable background task.
+ *
+ * @return Durable background task.
+ */
+ public DurableBackgroundTask task() {
+ return task;
+ }
+
+ /**
+ * Getting outside task future.
+ *
+ * @return Outside task future.
+ */
+ @Nullable public GridFutureAdapter<Void> outFuture() {
+ return outFut;
+ }
+
+ /**
+ * Check if the task has been saved to the MetaStorage.
+ *
+ * @return {@code True} if stored in the MetaStorage.
+ */
+ public boolean saved() {
+ return saved;
+ }
+
+ /**
+ * Getting current state of the task.
+ *
+ * @return Current state of the task.
+ */
+ public State state() {
+ return state;
+ }
+
+ /**
+ * Set the current state of the task.
+ *
+ * @param s New current state of the task.
+ */
+ public void state(State s) {
+ state = s;
+ }
+
+ /**
+ * Atomically sets of the current task state.
+ *
+ * @param exp Expected state.
+ * @param newState New state.
+ * @return {@code True} if successful.
+ */
+ public boolean state(State exp, State newState) {
+ return STATE_UPDATER.compareAndSet(this, exp, newState);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DurableBackgroundTaskState.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
index dfbab72..9fb998f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
@@ -16,315 +16,341 @@
*/
package org.apache.ignite.internal.processors.localtask;
-import java.util.Map;
-import java.util.Set;
+import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
-import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
/**
- * Processor that is responsible for durable background tasks that are executed on local node
- * and should be continued even after node restart.
+ * Processor that is responsible for durable background tasks that are executed on local node.
*/
public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener,
CheckpointListener {
/** Prefix for metastorage keys for durable background tasks. */
- private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
+ private static final String TASK_PREFIX = "durable-background-task-";
- /** Metastorage. */
- private volatile ReadWriteMetastorage metastorage;
-
- /** Metastorage synchronization mutex. */
+ /** MetaStorage synchronization mutex. */
private final Object metaStorageMux = new Object();
- /** Set of workers that executing durable background tasks. */
- private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<>();
-
- /** Count of workers that executing durable background tasks. */
- private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
+ /** Current tasks. Mapping: {@link DurableBackgroundTask#name task name} -> task state. */
+ private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<>();
- /** Durable background tasks map. */
- private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
-
- /** Set of started tasks' names. */
- private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+ /** Lock for canceling tasks. */
+ private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
/**
- * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
- *
- * @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+ * Tasks to be removed from the MetaStorage after the end of a checkpoint.
+ * Mapping: {@link DurableBackgroundTask#name task name} -> task.
*/
- private volatile boolean forbidStartingNewTasks;
+ private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<>();
+
+ /** Prohibiting the execution of tasks. */
+ private volatile boolean prohibitionExecTasks = true;
/**
+ * Constructor.
+ *
* @param ctx Kernal context.
*/
public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
super(ctx);
}
- /**
- * Starts the asynchronous operation of pending tasks execution. Is called on start.
- */
- private void asyncDurableBackgroundTasksExecution() {
- assert durableBackgroundTasks != null;
-
- for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
- if (!task.isCompleted() && startedTasks.add(task.shortName()))
- asyncDurableBackgroundTaskExecute(task);
- }
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
}
- /**
- * Creates a worker to execute single durable background task.
- *
- * @param task Task.
- */
- private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
- String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ cancelTasks();
+ }
- GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
- @Override public void cancel() {
- task.onCancel();
+ /** {@inheritDoc} */
+ @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+ metaStorageOperation(metaStorage -> {
+ assert metaStorage != null;
+
+ metaStorage.iterate(
+ TASK_PREFIX,
+ (k, v) -> {
+ DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+ tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+ },
+ true
+ );
+ });
+ }
- super.cancel();
- }
+ /** {@inheritDoc} */
+ @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
+ ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+ }
- @Override protected void body() {
- try {
- if (forbidStartingNewTasks)
- return;
+ /** {@inheritDoc} */
+ @Override public void beforeCheckpointBegin(Context ctx) {
+ /* No op. */
+ }
- log.info("Executing durable background task: " + task.shortName());
+ /** {@inheritDoc} */
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ for (Iterator<DurableBackgroundTaskState> it = tasks.values().iterator(); it.hasNext(); ) {
+ DurableBackgroundTaskState taskState = it.next();
- task.execute(ctx);
+ if (taskState.state() == COMPLETED) {
+ assert taskState.saved();
- task.complete();
+ DurableBackgroundTask t = taskState.task();
- log.info("Execution of durable background task completed: " + task.shortName());
- }
- catch (Throwable e) {
- log.error("Could not execute durable background task: " + task.shortName(), e);
- }
- finally {
- startedTasks.remove(task.shortName());
+ toRmv.put(t.name(), t);
- asyncDurableBackgroundTaskWorkers.remove(this);
- }
+ it.remove();
}
- };
-
- asyncDurableBackgroundTaskWorkers.add(worker);
-
- Thread asyncTask = new IgniteThread(worker);
-
- asyncTask.start();
+ }
}
/** {@inheritDoc} */
- @Override public void onKernalStart(boolean active) {
- asyncDurableBackgroundTasksExecution();
+ @Override public void onCheckpointBegin(Context ctx) {
+ /* No op. */
}
/** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- forbidStartingNewTasks = true;
+ @Override public void afterCheckpointEnd(Context ctx) {
+ for (Iterator<DurableBackgroundTask> it = toRmv.values().iterator(); it.hasNext(); ) {
+ DurableBackgroundTask t = it.next();
- awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
- }
+ metaStorageOperation(metaStorage -> {
+ if (metaStorage != null && toRmv.containsKey(t.name()))
+ metaStorage.remove(metaStorageKey(t));
+ });
- /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+ it.remove();
+ }
}
/**
- * @param msg Message.
+ * Callback at the start of a global state change.
+ *
+ * @param msg Message for change cluster global state.
*/
- public void onStateChange(ChangeGlobalStateMessage msg) {
- if (msg.state() == ClusterState.INACTIVE) {
- forbidStartingNewTasks = true;
-
- awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
- }
+ public void onStateChangeStarted(ChangeGlobalStateMessage msg) {
+ if (msg.state() == ClusterState.INACTIVE)
+ cancelTasks();
}
/**
- * @param msg Message.
+ * Callback on finish of a global state change.
+ *
+ * @param msg Finish message for change cluster global state.
*/
public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
if (msg.state() != ClusterState.INACTIVE) {
- forbidStartingNewTasks = false;
-
- asyncDurableBackgroundTasksExecution();
- }
- }
+ prohibitionExecTasks = false;
- /** {@inheritDoc} */
- @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
- synchronized (metaStorageMux) {
- if (durableBackgroundTasks.isEmpty()) {
- try {
- metastorage.iterate(
- STORE_DURABLE_BACKGROUND_TASK_PREFIX,
- (key, val) -> durableBackgroundTasks.put(key, (DurableBackgroundTask)val),
- true
- );
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Failed to iterate durable background tasks storage.", e);
- }
+ for (DurableBackgroundTaskState taskState : tasks.values()) {
+ if (!prohibitionExecTasks)
+ executeAsync0(taskState.task());
}
}
}
- /** {@inheritDoc} */
- @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
- synchronized (metaStorageMux) {
- try {
- for (Map.Entry<String, DurableBackgroundTask> entry : durableBackgroundTasks.entrySet()) {
- if (metastorage.readRaw(entry.getKey()) == null)
- metastorage.write(entry.getKey(), entry.getValue());
- }
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Failed to read key from durable background tasks storage.", e);
- }
+ /**
+ * Asynchronous execution of a durable background task.
+ *
+ * A new task will be added for execution either if there is no task with
+ * the same {@link DurableBackgroundTask#name name} or it (previous) will be completed.
+ *
+ * If the task is required to be completed after restarting the node,
+ * then it must be saved to the MetaStorage.
+ *
+ * If the task is saved to the Metastorage, then it will be deleted from it
+ * only after its completion and at the end of the checkpoint. Otherwise, it
+ * will be removed as soon as it is completed.
+ *
+ * @param task Durable background task.
+ * @param save Save task to MetaStorage.
+ * @return Futures that will complete when the task is completed.
+ */
+ public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask task, boolean save) {
+ DurableBackgroundTaskState taskState = tasks.compute(task.name(), (taskName, prev) -> {
+ if (prev != null && prev.state() != COMPLETED)
+ throw new IllegalArgumentException("Task is already present and has not been completed: " + taskName);
+
+ if (save)
+ toRmv.remove(taskName);
+
+ return new DurableBackgroundTaskState(task, new GridFutureAdapter<>(), save);
+ });
+
+ if (save) {
+ metaStorageOperation(metaStorage -> {
+ if (metaStorage != null)
+ metaStorage.write(metaStorageKey(task), task);
+ });
}
- ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+ if (!prohibitionExecTasks)
+ executeAsync0(task);
- this.metastorage = metastorage;
+ return taskState.outFuture();
}
/**
- * Builds a metastorage key for durable background task object.
+ * Overloading the {@link #executeAsync(DurableBackgroundTask, boolean)}.
+ * If task is applied to persistent cache, saves it to MetaStorage.
*
- * @param obj Object.
- * @return Metastorage key.
+ * @param t Durable background task.
+ * @param cacheCfg Cache configuration.
+ * @return Futures that will complete when the task is completed.
*/
- private String durableBackgroundTaskMetastorageKey(DurableBackgroundTask obj) {
- String k = STORE_DURABLE_BACKGROUND_TASK_PREFIX + obj.shortName();
-
- if (k.length() > MetastorageTree.MAX_KEY_LEN) {
- int hashLenLimit = 5;
-
- String hash = String.valueOf(k.hashCode());
-
- k = k.substring(0, MetastorageTree.MAX_KEY_LEN - hashLenLimit) +
- (hash.length() > hashLenLimit ? hash.substring(0, hashLenLimit) : hash);
- }
-
- return k;
+ public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask t, CacheConfiguration cacheCfg) {
+ return executeAsync(t, CU.isPersistentCache(cacheCfg, ctx.config().getDataStorageConfiguration()));
}
/**
- * Adds durable background task object.
+ * Asynchronous execution of a durable background task.
*
- * @param obj Object.
+ * @param t Durable background task.
*/
- private void addDurableBackgroundTask(DurableBackgroundTask obj) {
- String objName = durableBackgroundTaskMetastorageKey(obj);
+ private void executeAsync0(DurableBackgroundTask t) {
+ cancelLock.readLock().lock();
- synchronized (metaStorageMux) {
- durableBackgroundTasks.put(objName, obj);
-
- if (metastorage != null) {
- ctx.cache().context().database().checkpointReadLock();
-
- try {
- metastorage.write(objName, obj);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- finally {
- ctx.cache().context().database().checkpointReadUnlock();
- }
+ try {
+ DurableBackgroundTaskState taskState = tasks.get(t.name());
+
+ if (taskState != null && taskState.state(INIT, PREPARE)) {
+ if (log.isInfoEnabled())
+ log.info("Executing durable background task: " + t.name());
+
+ t.executeAsync(ctx).listen(f -> {
+ DurableBackgroundTaskResult res = null;
+
+ try {
+ res = f.get();
+ }
+ catch (Throwable e) {
+ log.error("Task completed with an error: " + t.name(), e);
+ }
+
+ assert res != null;
+
+ if (res.error() != null)
+ log.error("Could not execute durable background task: " + t.name(), res.error());
+
+ if (res.completed()) {
+ if (res.error() == null && log.isInfoEnabled())
+ log.info("Execution of durable background task completed: " + t.name());
+
+ if (taskState.saved())
+ taskState.state(COMPLETED);
+ else
+ tasks.remove(t.name());
+
+ GridFutureAdapter<Void> outFut = taskState.outFuture();
+
+ if (outFut != null)
+ outFut.onDone(res.error());
+ }
+ else {
+ assert res.restart();
+
+ if (log.isInfoEnabled())
+ log.info("Execution of durable background task will be restarted: " + t.name());
+
+ taskState.state(INIT);
+ }
+ });
+
+ taskState.state(PREPARE, STARTED);
}
}
+ finally {
+ cancelLock.readLock().unlock();
+ }
}
/**
- * Removes durable background task object.
- *
- * @param obj Object.
+ * Canceling tasks that are currently being executed.
+ * Prohibiting the execution of tasks.
*/
- private void removeDurableBackgroundTask(DurableBackgroundTask obj) {
- String objName = durableBackgroundTaskMetastorageKey(obj);
+ private void cancelTasks() {
+ prohibitionExecTasks = true;
- synchronized (metaStorageMux) {
- durableBackgroundTasks.remove(objName);
-
- if (metastorage != null) {
- ctx.cache().context().database().checkpointReadLock();
-
- try {
- metastorage.remove(objName);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- finally {
- ctx.cache().context().database().checkpointReadUnlock();
- }
+ cancelLock.writeLock().lock();
+
+ try {
+ for (DurableBackgroundTaskState taskState : tasks.values()) {
+ if (taskState.state() == STARTED)
+ taskState.task().cancel();
}
}
+ finally {
+ cancelLock.writeLock().unlock();
+ }
}
/**
- * Starts durable background task. If task is applied to persistent cache, saves it to metastorage.
+ * Performing an operation on a {@link MetaStorage}.
+ * Guarded by {@link #metaStorageMux}.
*
- * @param task Continuous task.
- * @param ccfg Cache configuration.
+ * @param consumer MetaStorage operation, argument can be {@code null}.
+ * @throws IgniteException If an exception is thrown from the {@code consumer}.
*/
- public void startDurableBackgroundTask(DurableBackgroundTask task, CacheConfiguration ccfg) {
- if (CU.isPersistentCache(ccfg, ctx.config().getDataStorageConfiguration()))
- addDurableBackgroundTask(task);
+ private void metaStorageOperation(IgniteThrowableConsumer<MetaStorage> consumer) throws IgniteException {
+ synchronized (metaStorageMux) {
+ IgniteCacheDatabaseSharedManager dbMgr = ctx.cache().context().database();
- asyncDurableBackgroundTaskExecute(task);
- }
+ dbMgr.checkpointReadLock();
- /** {@inheritDoc} */
- @Override public void onMarkCheckpointBegin(Context ctx) {
- /* No op. */
- }
-
- /** {@inheritDoc} */
- @Override public void onCheckpointBegin(Context ctx) {
- /* No op. */
- }
+ try {
+ MetaStorage metaStorage = dbMgr.metaStorage();
- /** {@inheritDoc} */
- @Override public void beforeCheckpointBegin(Context ctx) {
- /* No op. */
+ consumer.accept(metaStorage);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ dbMgr.checkpointReadUnlock();
+ }
+ }
}
- /** {@inheritDoc} */
- @Override public void afterCheckpointEnd(Context ctx) {
- for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
- if (task.isCompleted())
- removeDurableBackgroundTask(task);
- }
+ /**
+ * Getting the task key for the MetaStorage.
+ *
+ * @param t Durable background task.
+ * @return MetaStorage {@code t} key.
+ */
+ static String metaStorageKey(DurableBackgroundTask t) {
+ return TASK_PREFIX + t.name();
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index d72dc8f..de79bf8 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -11669,7 +11669,11 @@ public abstract class IgniteUtils {
* @param cancel Wheter should cancel workers.
* @param log Logger.
*/
- public static void awaitForWorkersStop(Collection<GridWorker> workers, boolean cancel, IgniteLogger log) {
+ public static void awaitForWorkersStop(
+ Collection<GridWorker> workers,
+ boolean cancel,
+ @Nullable IgniteLogger log
+ ) {
for (GridWorker worker : workers) {
try {
if (cancel)
@@ -11678,7 +11682,8 @@ public abstract class IgniteUtils {
worker.join();
}
catch (Exception e) {
- log.warning(String.format("Failed to cancel grid runnable [%s]: %s", worker.toString(), e.getMessage()));
+ if (log != null)
+ log.warning("Failed to cancel grid runnable [" + worker.toString() + "]: " + e.getMessage());
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
index eea82cf..9a42ff1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
@@ -440,16 +440,6 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest {
}
/**
- * Getting db manager of node.
- *
- * @param n Node.
- * @return Db manager.
- */
- private GridCacheDatabaseSharedManager dbMgr(IgniteEx n) {
- return (GridCacheDatabaseSharedManager)n.context().cache().context().database();
- }
-
- /**
* Getting DATASTORAGE_METRIC_PREFIX metric registry.
*
* @param n Node.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java
index a060eed..c6dcf90 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -271,14 +270,4 @@ public class WalArchiveConsistencyTest extends GridCommonAbstractTest {
", segNum=" + segments + ", currSeg=" + walMgr(n).currentSegment() + ']');
}
}
-
- /**
- * Getting db manager of node.
- *
- * @param n Node.
- * @return Db manager.
- */
- private GridCacheDatabaseSharedManager dbMgr(IgniteEx n) {
- return (GridCacheDatabaseSharedManager)n.context().cache().context().database();
- }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
new file mode 100644
index 0000000..ba0da72
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.localtask;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult.complete;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult.restart;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessor.metaStorageKey;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+
+/**
+ * Class for testing the {@link DurableBackgroundTasksProcessor}.
+ */
+public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
+ );
+ }
+
+ /**
+ * Checking the correctness of work (without saving to the MetaStorage) of the task in normal mode.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSimpleTaskExecutionWithoutMetaStorage() throws Exception {
+ checkSimpleTaskExecute(false);
+ }
+
+ /**
+ * Checking the correctness of work (with saving to the MetaStorage) of the task in normal mode.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSimpleTaskExecutionWithMetaStorage() throws Exception {
+ checkSimpleTaskExecute(true);
+ }
+
+ /**
+ * Checking the correctness of restarting the task without MetaStorage.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRestartTaskExecutionWithoutMetaStorage() throws Exception {
+ checkRestartTaskExecute(false);
+ }
+
+ /**
+ * Checking the correctness of restarting the task with MetaStorage.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRestartTaskExecutionWithMetaStorage() throws Exception {
+ checkRestartTaskExecute(true);
+ }
+
+ /**
+ * Checking the correctness of cancelling the task.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCancelTaskExecution() throws Exception {
+ IgniteEx n = startGrid(0);
+ n.cluster().state(ACTIVE);
+
+ SimpleTask t = new SimpleTask("t");
+ IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, false);
+
+ t.onExecFut.get(getTestTimeout());
+ checkStateAndMetaStorage(n, t, STARTED, false);
+ assertFalse(execAsyncFut.isDone());
+
+ n.cluster().state(INACTIVE);
+
+ t.onExecFut.get(getTestTimeout());
+ t.taskFut.onDone(complete(null));
+
+ execAsyncFut.get(getTestTimeout());
+ }
+
+ /**
+ * Check that the task will be restarted after restarting the node.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRestartTaskAfterRestartNode() throws Exception {
+ IgniteEx n = startGrid(0);
+ n.cluster().state(ACTIVE);
+
+ SimpleTask t = new SimpleTask("t");
+ execAsync(n, t, true);
+
+ t.onExecFut.get(getTestTimeout());
+ checkStateAndMetaStorage(n, t, STARTED, true);
+ t.taskFut.onDone(restart(null));
+
+ stopAllGrids();
+
+ n = startGrid(0);
+ n.cluster().state(ACTIVE);
+
+ t = ((SimpleTask)tasks(n).get(t.name()).task());
+
+ t.onExecFut.get(getTestTimeout());
+ checkStateAndMetaStorage(n, t, STARTED, true);
+ t.taskFut.onDone(complete(null));
+ }
+
+ /**
+ * Check that the task will be restarted correctly.
+ *
+ * @param save Save to MetaStorage.
+ * @throws Exception If failed.
+ */
+ private void checkRestartTaskExecute(boolean save) throws Exception {
+ IgniteEx n = startGrid(0);
+ n.cluster().state(ACTIVE);
+
+ SimpleTask t = new SimpleTask("t");
+ IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, save);
+
+ t.onExecFut.get(getTestTimeout());
+ checkStateAndMetaStorage(n, t, STARTED, save);
+ assertFalse(execAsyncFut.isDone());
+
+ if (save) {
+ ObservingCheckpointListener checkpointLsnr = new ObservingCheckpointListener();
+ dbMgr(n).addCheckpointListener(checkpointLsnr);
+
+ dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+ t.taskFut.onDone(restart(null));
+ checkStateAndMetaStorage(n, t, INIT, true);
+ assertFalse(execAsyncFut.isDone());
+
+ GridFutureAdapter<Void> onMarkCheckpointBeginFut = checkpointLsnr.onMarkCheckpointBeginAsync(ctx -> {
+ checkStateAndMetaStorage(n, t, INIT, true);
+ assertFalse(toRmv(n).containsKey(t.name()));
+ });
+
+ dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
+ onMarkCheckpointBeginFut.get(getTestTimeout());
+ }
+ else {
+ t.taskFut.onDone(restart(null));
+ checkStateAndMetaStorage(n, t, INIT, false);
+ assertFalse(execAsyncFut.isDone());
+ }
+
+ t.reset();
+
+ n.cluster().state(INACTIVE);
+ n.cluster().state(ACTIVE);
+
+ t.onExecFut.get(getTestTimeout());
+ checkStateAndMetaStorage(n, t, STARTED, save);
+ assertFalse(execAsyncFut.isDone());
+
+ t.taskFut.onDone(complete(null));
+ execAsyncFut.get(getTestTimeout());
+ }
+
+ /**
+ * Checking that the task will be processed correctly in the normal mode.
+ *
+ * @param save Save to MetaStorage.
+ * @throws Exception If failed.
+ */
+ private void checkSimpleTaskExecute(boolean save) throws Exception {
+ IgniteEx n = startGrid(0);
+
+ SimpleTask t = new SimpleTask("t");
+ IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, save);
+
+ checkStateAndMetaStorage(n, t, INIT, save);
+
+ checkExecuteSameTask(n, t);
+ checkStateAndMetaStorage(n, t, INIT, save);
+
+ assertFalse(t.onExecFut.isDone());
+ assertFalse(execAsyncFut.isDone());
+
+ n.cluster().state(ACTIVE);
+
+ t.onExecFut.get(getTestTimeout());
+
+ checkStateAndMetaStorage(n, t, STARTED, save);
+ assertFalse(execAsyncFut.isDone());
+
+ if (save) {
+ dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+ t.taskFut.onDone(complete(null));
+ execAsyncFut.get(getTestTimeout());
+
+ checkStateAndMetaStorage(n, t, COMPLETED, true);
+
+ ObservingCheckpointListener checkpointLsnr = new ObservingCheckpointListener();
+
+ GridFutureAdapter<Void> onMarkCheckpointBeginFut = checkpointLsnr.onMarkCheckpointBeginAsync(
+ ctx -> {
+ checkStateAndMetaStorage(n, t, null, true);
+ assertTrue(toRmv(n).containsKey(t.name()));
+ }
+ );
+
+ GridFutureAdapter<Void> afterCheckpointEndFut = checkpointLsnr.afterCheckpointEndAsync(
+ ctx -> {
+ checkStateAndMetaStorage(n, t, null, false);
+ assertFalse(toRmv(n).containsKey(t.name()));
+ }
+ );
+
+ dbMgr(n).addCheckpointListener(checkpointLsnr);
+ dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
+
+ onMarkCheckpointBeginFut.get(getTestTimeout());
+ afterCheckpointEndFut.get(getTestTimeout());
+ }
+ else {
+ t.taskFut.onDone(complete(null));
+ execAsyncFut.get(getTestTimeout());
+
+ checkStateAndMetaStorage(n, t, null, false);
+ }
+ }
+
+ /**
+ * Checking that until the task is completed it is impossible to add a
+ * task with the same {@link DurableBackgroundTask#name name}.
+ *
+ * @param n Node.
+ * @param t Task.
+ */
+ private void checkExecuteSameTask(IgniteEx n, DurableBackgroundTask t) {
+ assertThrows(log, () -> execAsync(n, t, false), IllegalArgumentException.class, null);
+ assertThrows(log, () -> execAsync(n, t, true), IllegalArgumentException.class, null);
+ assertThrows(log, () -> execAsync(n, new SimpleTask(t.name()), false), IllegalArgumentException.class, null);
+ assertThrows(log, () -> execAsync(n, new SimpleTask(t.name()), true), IllegalArgumentException.class, null);
+ }
+
+ /**
+ * Checking the internal {@link DurableBackgroundTaskState state} of the task and storage in the MetaStorage.
+ *
+ * @param n Node.
+ * @param t Task.
+ * @param expState Expected state of the task, {@code null} means that the task should not be.
+ * @param expSaved Task is expected to be stored in MetaStorage.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void checkStateAndMetaStorage(
+ IgniteEx n,
+ DurableBackgroundTask t,
+ @Nullable State expState,
+ boolean expSaved
+ ) throws IgniteCheckedException {
+ DurableBackgroundTaskState taskState = tasks(n).get(t.name());
+
+ if (expState == null)
+ assertNull(taskState);
+ else
+ assertEquals(expState, taskState.state());
+
+ DurableBackgroundTask ser = (DurableBackgroundTask)metaStorageOperation(n, ms -> ms.read(metaStorageKey(t)));
+
+ if (expSaved)
+ assertEquals(t.name(), ser.name());
+ else
+ assertNull(ser);
+ }
+
+ /**
+ * Asynchronous execution of a durable background task.
+ *
+ * @param n Node.
+ * @param t Task.
+ * @param save Save task to MetaStorage.
+ * @return Task future.
+ */
+ private IgniteInternalFuture<Void> execAsync(IgniteEx n, DurableBackgroundTask t, boolean save) {
+ return durableBackgroundTasksProcessor(n).executeAsync(t, save);
+ }
+
+ /**
+ * Getting {@code DurableBackgroundTasksProcessor#toRmv}.
+ *
+ * @param n Node.
+ * @return Map of tasks that will be removed from the MetaStorage.
+ */
+ private Map<String, DurableBackgroundTask> toRmv(IgniteEx n) {
+ return getFieldValue(durableBackgroundTasksProcessor(n), "toRmv");
+ }
+
+ /**
+ * Getting {@code DurableBackgroundTasksProcessor#tasks}.
+ *
+ * @param n Node.
+ * @return Task states map.
+ */
+ private Map<String, DurableBackgroundTaskState> tasks(IgniteEx n) {
+ return getFieldValue(durableBackgroundTasksProcessor(n), "tasks");
+ }
+
+ /**
+ * Getting durable background task processor.
+ *
+ * @param n Node.
+ * @return Durable background task processor.
+ */
+ private DurableBackgroundTasksProcessor durableBackgroundTasksProcessor(IgniteEx n) {
+ return n.context().durableBackgroundTasksProcessor();
+ }
+
+ /**
+ * Performing an operation on a MetaStorage.
+ *
+ * @param n Node.
+ * @param fun Function for working with MetaStorage, the argument can be {@code null}.
+ * @return The function result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private <R> R metaStorageOperation(
+ IgniteEx n,
+ IgniteThrowableFunction<MetaStorage, R> fun
+ ) throws IgniteCheckedException {
+ GridCacheDatabaseSharedManager dbMgr = dbMgr(n);
+
+ dbMgr.checkpointReadLock();
+
+ try {
+ return fun.apply(dbMgr.metaStorage());
+ }
+ finally {
+ dbMgr.checkpointReadUnlock();
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java
new file mode 100644
index 0000000..b86ed09
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.localtask;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+
+/**
+ * Observer is a checkpoint listener.
+ * Consumers will be reset after a single use.
+ */
+public class ObservingCheckpointListener implements CheckpointListener {
+ /** On mark checkpoint begin consumer. */
+ volatile IgniteThrowableConsumer<Context> onMarkCheckpointBeginConsumer;
+
+ /** After checkpoint end consumer. */
+ volatile IgniteThrowableConsumer<Context> afterCheckpointEndConsumer;
+
+ /** {@inheritDoc} */
+ @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
+ IgniteThrowableConsumer<Context> consumer = onMarkCheckpointBeginConsumer;
+
+ if (consumer != null) {
+ onMarkCheckpointBeginConsumer = null;
+
+ consumer.accept(ctx);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void afterCheckpointEnd(Context ctx) throws IgniteCheckedException {
+ IgniteThrowableConsumer<Context> consumer = afterCheckpointEndConsumer;
+
+ if (consumer != null) {
+ afterCheckpointEndConsumer = null;
+
+ consumer.accept(ctx);
+ }
+ }
+
+ /**
+ * Asynchronous execution {@link #onMarkCheckpointBeginConsumer}.
+ *
+ * @param consumer On mark checkpoint begin consumer.
+ * @return Future that complete when the consumer complete.
+ */
+ GridFutureAdapter<Void> onMarkCheckpointBeginAsync(IgniteThrowableConsumer<Context> consumer) {
+ GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+ onMarkCheckpointBeginConsumer = asyncConsumer(consumer, fut);
+
+ return fut;
+ }
+
+ /**
+ * Asynchronous execution {@link #afterCheckpointEndConsumer}.
+ *
+ * @param consumer After checkpoint end consumer.
+ * @return Future that complete when the consumer complete.
+ */
+ GridFutureAdapter<Void> afterCheckpointEndAsync(IgniteThrowableConsumer<Context> consumer) {
+ GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+ afterCheckpointEndConsumer = asyncConsumer(consumer, fut);
+
+ return fut;
+ }
+
+ /**
+ * Make the consumer asynchronous.
+ *
+ * @param consumer Consumer.
+ * @param fut Future that complete when the consumer complete.
+ * @return New consumer.
+ */
+ private static IgniteThrowableConsumer<Context> asyncConsumer(
+ IgniteThrowableConsumer<Context> consumer,
+ GridFutureAdapter<Void> fut
+ ) {
+ return ctx -> {
+ Throwable err = null;
+
+ try {
+ consumer.accept(ctx);
+ }
+ catch (Throwable t) {
+ err = t;
+ }
+
+ fut.onDone(err);
+ };
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/SimpleTask.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/SimpleTask.java
new file mode 100644
index 0000000..8d14ea4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/SimpleTask.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.localtask;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Simple {@link DurableBackgroundTask} implementation for tests.
+ */
+class SimpleTask extends IgniteDataTransferObject implements DurableBackgroundTask {
+ /** Serial version UID. */
+ private static final long serialVersionUID = 0L;
+
+ /** Task name. */
+ private String name;
+
+ /** Future that will be completed at the beginning of the {@link #executeAsync}. */
+ final GridFutureAdapter<Void> onExecFut = new GridFutureAdapter<>();
+
+ /** Future that will be returned from the {@link #executeAsync}. */
+ final GridFutureAdapter<DurableBackgroundTaskResult> taskFut = new GridFutureAdapter<>();
+
+ /** Future that will be completed at the beginning of the {@link #cancel}. */
+ final GridFutureAdapter<Void> onCancelFut = new GridFutureAdapter<>();
+
+ /**
+ * Default constructor.
+ */
+ public SimpleTask() {
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param name Task name.
+ */
+ public SimpleTask(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ onCancelFut.onDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<DurableBackgroundTaskResult> executeAsync(GridKernalContext ctx) {
+ onExecFut.onDone();
+
+ return taskFut;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+ U.writeLongString(out, name);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(
+ byte protoVer,
+ ObjectInput in
+ ) throws IOException, ClassNotFoundException {
+ name = U.readLongString(in);
+ }
+
+ /**
+ * Resetting internal futures.
+ */
+ void reset() {
+ onExecFut.reset();
+ taskFut.reset();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index a7f4c74..0f02435 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -2714,4 +2714,14 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
return Optional.ofNullable(n.cachex(cacheName)).map(IgniteInternalCache::context)
.map(GridCacheContext::cache).map(GridCacheAdapter::metrics0).orElse(null);
}
+
+ /**
+ * Getting database manager.
+ *
+ * @param n Node.
+ * @return Database manager.
+ */
+ protected GridCacheDatabaseSharedManager dbMgr(IgniteEx n) {
+ return (GridCacheDatabaseSharedManager)n.context().cache().context().database();
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 73b4880..d5667fd 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.processors.database.IgniteDbPutGetWithCacheSto
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodePutGetTest;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeTinyPutGetTest;
import org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessorTest;
+import org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessorSelfTest;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStoragePersistentTest;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -212,5 +213,7 @@ public class IgnitePdsTestSuite {
GridTestUtils.addTestIfNeeded(suite, ActiveOnStartPropertyTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, AutoActivationPropertyTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, ClusterStateOnStartPropertyTest.class, ignoredTests);
+
+ GridTestUtils.addTestIfNeeded(suite, DurableBackgroundTasksProcessorSelfTest.class, ignoredTests);
}
}