You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/04/29 07:47:15 UTC

[GitHub] [ignite] ibessonov commented on a change in pull request #9037: IGNITE-14625

ibessonov commented on a change in pull request #9037:
URL: https://github.com/apache/ignite/pull/9037#discussion_r622801876



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
##########
@@ -98,12 +110,99 @@ public DurableBackgroundCleanupIndexTreeTask(
     }
 
     /** {@inheritDoc} */
-    @Override public String shortName() {
+    @Override public String name() {
         return "DROP_SQL_INDEX-" + schemaName + "." + idxName + "-" + id;
     }
 
     /** {@inheritDoc} */
-    @Override public void execute(GridKernalContext ctx) {
+    @Override public IgniteInternalFuture<DurableBackgroundTaskResult> executeAsync(GridKernalContext ctx) {
+        log = ctx.log(this.getClass());
+
+        assert worker == null;
+
+        GridFutureAdapter<DurableBackgroundTaskResult> fut = new GridFutureAdapter<>();
+
+        worker = new GridWorker(
+            ctx.igniteInstanceName(),
+            "async-durable-background-task-executor-" + name(),
+            log
+        ) {
+            /** {@inheritDoc} */
+            @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+                try {
+                    execute(ctx);
+
+                    worker = null;
+
+                    fut.onDone(DurableBackgroundTaskResult.complete(null));
+                }
+                catch (Throwable t) {
+                    worker = null;
+
+                    fut.onDone(DurableBackgroundTaskResult.restart(t));
+                }
+            }
+        };
+
+        new IgniteThread(worker).start();
+
+        return fut;
+    }
+
+    /**
+     * Checks that pageId is still relevant and has not been deleted / reused.
+     * @param grpId Cache group id.
+     * @param pageMem Page memory instance.
+     * @param rootPageId Root page identifier.
+     * @return {@code true} if root page was deleted/reused, {@code false} otherwise.
+     */
+    private boolean skipDeletedRoot(int grpId, PageMemory pageMem, long rootPageId) {
+        try {
+            long page = pageMem.acquirePage(grpId, rootPageId);
+
+            try {
+                long pageAddr = pageMem.readLock(grpId, rootPageId, page);
+
+                try {
+                    return pageAddr == 0;
+                }
+                finally {
+                    pageMem.readUnlock(grpId, rootPageId, page);
+                }
+            }
+            finally {
+                pageMem.releasePage(grpId, rootPageId, page);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Cannot acquire tree root page.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        trees = null;
+
+        GridWorker w = worker;
+
+        if (w != null) {
+            worker = null;
+
+            U.awaitForWorkersStop(singleton(w), true, log);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DurableBackgroundCleanupIndexTreeTask.class, this);
+    }
+
+    /**
+     * Task execution.
+     *
+     * @param ctx Kernal context.
+     */
+    private void execute(GridKernalContext ctx) {

Review comment:
       Please move this method above **skipDeletedRoot** so that it won;t look like you moved bunch of methods into other place in file. Just take a look at the diff here in github to see what I mean.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
##########
@@ -16,315 +16,342 @@
  */
 package org.apache.ignite.internal.processors.localtask;
 
-import java.util.Map;
-import java.util.Set;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
-import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
 
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
 
 /**
- * Processor that is responsible for durable background tasks that are executed on local node
- * and should be continued even after node restart.
+ * Processor that is responsible for durable background tasks that are executed on local node.
  */
 public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener,
     CheckpointListener {
     /** Prefix for metastorage keys for durable background tasks. */
-    private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
+    private static final String TASK_PREFIX = "durable-background-task-";
 
-    /** Metastorage. */
-    private volatile ReadWriteMetastorage metastorage;
-
-    /** Metastorage synchronization mutex. */
+    /** MetaStorage synchronization mutex. */
     private final Object metaStorageMux = new Object();
 
-    /** Set of workers that executing durable background tasks. */
-    private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<>();
-
-    /** Count of workers that executing durable background tasks. */
-    private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
+    /** Current tasks. Mapping: {@link DurableBackgroundTask#name task name} -> task state. */
+    private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<>();
 
-    /** Durable background tasks map. */
-    private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
-
-    /** Set of started tasks' names. */
-    private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+    /** Lock for canceling tasks. */
+    private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
 
     /**
-     * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
-     *
-     *  @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+     * Tasks to be removed from the MetaStorage after the end of a checkpoint.
+     * Mapping: {@link DurableBackgroundTask#name task name} -> task.
      */
-    private volatile boolean forbidStartingNewTasks;
+    private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<>();
+
+    /** Prohibiting the execution of tasks. */
+    private volatile boolean prohibitionExecTasks = true;
 
     /**
+     * Constructor.
+     *
      * @param ctx Kernal context.
      */
     public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
         super(ctx);
     }
 
-    /**
-     * Starts the asynchronous operation of pending tasks execution. Is called on start.
-     */
-    private void asyncDurableBackgroundTasksExecution() {
-        assert durableBackgroundTasks != null;
-
-        for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
-            if (!task.isCompleted() && startedTasks.add(task.shortName()))
-                asyncDurableBackgroundTaskExecute(task);
-        }
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
     }
 
-    /**
-     * Creates a worker to execute single durable background task.
-     *
-     * @param task Task.
-     */
-    private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
-        String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        cancelTasks();
+    }
 
-        GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
-            @Override public void cancel() {
-                task.onCancel();
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+        metaStorageOperation(metaStorage -> {
+            assert metaStorage != null;
+
+            metaStorage.iterate(
+                TASK_PREFIX,
+                (k, v) -> {
+                    DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+                    tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+                },
+                true
+            );
+        });
+    }
 
-                super.cancel();
-            }
+    /** {@inheritDoc} */
+    @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
+        ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+    }
 
-            @Override protected void body() {
-                try {
-                    if (forbidStartingNewTasks)
-                        return;
+    /** {@inheritDoc} */
+    @Override public void beforeCheckpointBegin(Context ctx) {
+        /* No op. */
+    }
 
-                    log.info("Executing durable background task: " + task.shortName());
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTaskState>> it = tasks.entrySet().iterator(); it.hasNext(); ) {

Review comment:
       Any locks here?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
##########
@@ -16,315 +16,342 @@
  */
 package org.apache.ignite.internal.processors.localtask;
 
-import java.util.Map;
-import java.util.Set;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
-import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
 
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
 
 /**
- * Processor that is responsible for durable background tasks that are executed on local node
- * and should be continued even after node restart.
+ * Processor that is responsible for durable background tasks that are executed on local node.
  */
 public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener,
     CheckpointListener {
     /** Prefix for metastorage keys for durable background tasks. */
-    private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
+    private static final String TASK_PREFIX = "durable-background-task-";
 
-    /** Metastorage. */
-    private volatile ReadWriteMetastorage metastorage;
-
-    /** Metastorage synchronization mutex. */
+    /** MetaStorage synchronization mutex. */
     private final Object metaStorageMux = new Object();
 
-    /** Set of workers that executing durable background tasks. */
-    private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<>();
-
-    /** Count of workers that executing durable background tasks. */
-    private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
+    /** Current tasks. Mapping: {@link DurableBackgroundTask#name task name} -> task state. */
+    private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<>();
 
-    /** Durable background tasks map. */
-    private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
-
-    /** Set of started tasks' names. */
-    private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+    /** Lock for canceling tasks. */
+    private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
 
     /**
-     * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
-     *
-     *  @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+     * Tasks to be removed from the MetaStorage after the end of a checkpoint.
+     * Mapping: {@link DurableBackgroundTask#name task name} -> task.
      */
-    private volatile boolean forbidStartingNewTasks;
+    private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<>();
+
+    /** Prohibiting the execution of tasks. */
+    private volatile boolean prohibitionExecTasks = true;
 
     /**
+     * Constructor.
+     *
      * @param ctx Kernal context.
      */
     public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
         super(ctx);
     }
 
-    /**
-     * Starts the asynchronous operation of pending tasks execution. Is called on start.
-     */
-    private void asyncDurableBackgroundTasksExecution() {
-        assert durableBackgroundTasks != null;
-
-        for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
-            if (!task.isCompleted() && startedTasks.add(task.shortName()))
-                asyncDurableBackgroundTaskExecute(task);
-        }
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
     }
 
-    /**
-     * Creates a worker to execute single durable background task.
-     *
-     * @param task Task.
-     */
-    private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
-        String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        cancelTasks();
+    }
 
-        GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
-            @Override public void cancel() {
-                task.onCancel();
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+        metaStorageOperation(metaStorage -> {
+            assert metaStorage != null;
+
+            metaStorage.iterate(
+                TASK_PREFIX,
+                (k, v) -> {
+                    DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+                    tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+                },
+                true
+            );
+        });
+    }
 
-                super.cancel();
-            }
+    /** {@inheritDoc} */
+    @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
+        ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+    }
 
-            @Override protected void body() {
-                try {
-                    if (forbidStartingNewTasks)
-                        return;
+    /** {@inheritDoc} */
+    @Override public void beforeCheckpointBegin(Context ctx) {
+        /* No op. */
+    }
 
-                    log.info("Executing durable background task: " + task.shortName());
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTaskState>> it = tasks.entrySet().iterator(); it.hasNext(); ) {
+            DurableBackgroundTaskState taskState = it.next().getValue();
 
-                    task.execute(ctx);
+            if (taskState.state() == COMPLETED) {
+                assert taskState.saved();
 
-                    task.complete();
+                DurableBackgroundTask t = taskState.task();
 
-                    log.info("Execution of durable background task completed: " + task.shortName());
-                }
-                catch (Throwable e) {
-                    log.error("Could not execute durable background task: " + task.shortName(), e);
-                }
-                finally {
-                    startedTasks.remove(task.shortName());
+                toRmv.put(t.name(), t);
 
-                    asyncDurableBackgroundTaskWorkers.remove(this);
-                }
+                it.remove();
             }
-        };
-
-        asyncDurableBackgroundTaskWorkers.add(worker);
-
-        Thread asyncTask = new IgniteThread(worker);
-
-        asyncTask.start();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart(boolean active) {
-        asyncDurableBackgroundTasksExecution();
+    @Override public void onCheckpointBegin(Context ctx) {
+        /* No op. */
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        forbidStartingNewTasks = true;
+    @Override public void afterCheckpointEnd(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTask>> it = toRmv.entrySet().iterator(); it.hasNext(); ) {
+            DurableBackgroundTask t = it.next().getValue();
 
-        awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
-    }
+            metaStorageOperation(metaStorage -> {
+                if (metaStorage != null && toRmv.containsKey(t.name()))
+                    metaStorage.remove(metaStorageKey(t));
+            });
 
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+            it.remove();
+        }
     }
 
     /**
-     * @param msg Message.
+     * Callback at the start of a global state change.
+     *
+     * @param msg Message for change cluster global state.
      */
-    public void onStateChange(ChangeGlobalStateMessage msg) {
-        if (msg.state() == ClusterState.INACTIVE) {
-            forbidStartingNewTasks = true;
-
-            awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
-        }
+    public void onStateChangeStarted(ChangeGlobalStateMessage msg) {
+        if (msg.state() == ClusterState.INACTIVE)
+            cancelTasks();
     }
 
     /**
-     * @param msg Message.
+     * Callback on finish of a global state change.
+     *
+     * @param msg Finish message for change cluster global state.
      */
     public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
         if (msg.state() != ClusterState.INACTIVE) {
-            forbidStartingNewTasks = false;
-
-            asyncDurableBackgroundTasksExecution();
-        }
-    }
+            prohibitionExecTasks = false;
 
-    /** {@inheritDoc} */
-    @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
-        synchronized (metaStorageMux) {
-            if (durableBackgroundTasks.isEmpty()) {
-                try {
-                    metastorage.iterate(
-                        STORE_DURABLE_BACKGROUND_TASK_PREFIX,
-                        (key, val) -> durableBackgroundTasks.put(key, (DurableBackgroundTask)val),
-                        true
-                    );
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException("Failed to iterate durable background tasks storage.", e);
-                }
+            for (DurableBackgroundTaskState taskState : tasks.values()) {
+                if (!prohibitionExecTasks)
+                    executeAsync0(taskState.task());
             }
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
-        synchronized (metaStorageMux) {
-            try {
-                for (Map.Entry<String, DurableBackgroundTask> entry : durableBackgroundTasks.entrySet()) {
-                    if (metastorage.readRaw(entry.getKey()) == null)
-                        metastorage.write(entry.getKey(), entry.getValue());
-                }
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException("Failed to read key from durable background tasks storage.", e);
-            }
+    /**
+     * Asynchronous execution of a durable background task.
+     *
+     * A new task will be added for execution either if there is no task with
+     * the same {@link DurableBackgroundTask#name name} or it (previous) will be completed.
+     *
+     * If the task is required to be completed after restarting the node,
+     * then it must be saved to the MetaStorage.
+     *
+     * If the task is saved to the Metastorage, then it will be deleted from it
+     * only after its completion and at the end of the checkpoint. Otherwise, it
+     * will be removed as soon as it is completed.
+     *
+     * @param task Durable background task.
+     * @param save Save task to MetaStorage.
+     * @return Futures that will complete when the task is completed.
+     */
+    public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask task, boolean save) {
+        DurableBackgroundTaskState taskState = tasks.compute(task.name(), (taskName, prev) -> {
+            if (prev != null && prev.state() != COMPLETED)
+                throw new IllegalArgumentException("Task is already present and has not been completed: " + taskName);
+
+            if (save)
+                toRmv.remove(taskName);

Review comment:
       I see now, do we need some locks to avoid races?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
##########
@@ -16,315 +16,342 @@
  */
 package org.apache.ignite.internal.processors.localtask;
 
-import java.util.Map;
-import java.util.Set;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
-import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
 
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
 
 /**
- * Processor that is responsible for durable background tasks that are executed on local node
- * and should be continued even after node restart.
+ * Processor that is responsible for durable background tasks that are executed on local node.
  */
 public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener,
     CheckpointListener {
     /** Prefix for metastorage keys for durable background tasks. */
-    private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
+    private static final String TASK_PREFIX = "durable-background-task-";
 
-    /** Metastorage. */
-    private volatile ReadWriteMetastorage metastorage;
-
-    /** Metastorage synchronization mutex. */
+    /** MetaStorage synchronization mutex. */
     private final Object metaStorageMux = new Object();
 
-    /** Set of workers that executing durable background tasks. */
-    private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<>();
-
-    /** Count of workers that executing durable background tasks. */
-    private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
+    /** Current tasks. Mapping: {@link DurableBackgroundTask#name task name} -> task state. */
+    private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<>();
 
-    /** Durable background tasks map. */
-    private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
-
-    /** Set of started tasks' names. */
-    private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+    /** Lock for canceling tasks. */
+    private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
 
     /**
-     * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
-     *
-     *  @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+     * Tasks to be removed from the MetaStorage after the end of a checkpoint.
+     * Mapping: {@link DurableBackgroundTask#name task name} -> task.
      */
-    private volatile boolean forbidStartingNewTasks;
+    private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<>();
+
+    /** Prohibiting the execution of tasks. */
+    private volatile boolean prohibitionExecTasks = true;
 
     /**
+     * Constructor.
+     *
      * @param ctx Kernal context.
      */
     public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
         super(ctx);
     }
 
-    /**
-     * Starts the asynchronous operation of pending tasks execution. Is called on start.
-     */
-    private void asyncDurableBackgroundTasksExecution() {
-        assert durableBackgroundTasks != null;
-
-        for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
-            if (!task.isCompleted() && startedTasks.add(task.shortName()))
-                asyncDurableBackgroundTaskExecute(task);
-        }
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
     }
 
-    /**
-     * Creates a worker to execute single durable background task.
-     *
-     * @param task Task.
-     */
-    private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
-        String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        cancelTasks();
+    }
 
-        GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
-            @Override public void cancel() {
-                task.onCancel();
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+        metaStorageOperation(metaStorage -> {
+            assert metaStorage != null;
+
+            metaStorage.iterate(
+                TASK_PREFIX,
+                (k, v) -> {
+                    DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+                    tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+                },
+                true
+            );
+        });
+    }
 
-                super.cancel();
-            }
+    /** {@inheritDoc} */
+    @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
+        ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+    }
 
-            @Override protected void body() {
-                try {
-                    if (forbidStartingNewTasks)
-                        return;
+    /** {@inheritDoc} */
+    @Override public void beforeCheckpointBegin(Context ctx) {
+        /* No op. */
+    }
 
-                    log.info("Executing durable background task: " + task.shortName());
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTaskState>> it = tasks.entrySet().iterator(); it.hasNext(); ) {
+            DurableBackgroundTaskState taskState = it.next().getValue();
 
-                    task.execute(ctx);
+            if (taskState.state() == COMPLETED) {
+                assert taskState.saved();
 
-                    task.complete();
+                DurableBackgroundTask t = taskState.task();
 
-                    log.info("Execution of durable background task completed: " + task.shortName());
-                }
-                catch (Throwable e) {
-                    log.error("Could not execute durable background task: " + task.shortName(), e);
-                }
-                finally {
-                    startedTasks.remove(task.shortName());
+                toRmv.put(t.name(), t);
 
-                    asyncDurableBackgroundTaskWorkers.remove(this);
-                }
+                it.remove();
             }
-        };
-
-        asyncDurableBackgroundTaskWorkers.add(worker);
-
-        Thread asyncTask = new IgniteThread(worker);
-
-        asyncTask.start();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart(boolean active) {
-        asyncDurableBackgroundTasksExecution();
+    @Override public void onCheckpointBegin(Context ctx) {
+        /* No op. */
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        forbidStartingNewTasks = true;
+    @Override public void afterCheckpointEnd(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTask>> it = toRmv.entrySet().iterator(); it.hasNext(); ) {
+            DurableBackgroundTask t = it.next().getValue();
 
-        awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
-    }
+            metaStorageOperation(metaStorage -> {
+                if (metaStorage != null && toRmv.containsKey(t.name()))
+                    metaStorage.remove(metaStorageKey(t));
+            });
 
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+            it.remove();
+        }
     }
 
     /**
-     * @param msg Message.
+     * Callback at the start of a global state change.
+     *
+     * @param msg Message for change cluster global state.
      */
-    public void onStateChange(ChangeGlobalStateMessage msg) {
-        if (msg.state() == ClusterState.INACTIVE) {
-            forbidStartingNewTasks = true;
-
-            awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
-        }
+    public void onStateChangeStarted(ChangeGlobalStateMessage msg) {
+        if (msg.state() == ClusterState.INACTIVE)
+            cancelTasks();
     }
 
     /**
-     * @param msg Message.
+     * Callback on finish of a global state change.
+     *
+     * @param msg Finish message for change cluster global state.
      */
     public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
         if (msg.state() != ClusterState.INACTIVE) {
-            forbidStartingNewTasks = false;
-
-            asyncDurableBackgroundTasksExecution();
-        }
-    }
+            prohibitionExecTasks = false;
 
-    /** {@inheritDoc} */
-    @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
-        synchronized (metaStorageMux) {
-            if (durableBackgroundTasks.isEmpty()) {
-                try {
-                    metastorage.iterate(
-                        STORE_DURABLE_BACKGROUND_TASK_PREFIX,
-                        (key, val) -> durableBackgroundTasks.put(key, (DurableBackgroundTask)val),
-                        true
-                    );
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException("Failed to iterate durable background tasks storage.", e);
-                }
+            for (DurableBackgroundTaskState taskState : tasks.values()) {
+                if (!prohibitionExecTasks)
+                    executeAsync0(taskState.task());
             }
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
-        synchronized (metaStorageMux) {
-            try {
-                for (Map.Entry<String, DurableBackgroundTask> entry : durableBackgroundTasks.entrySet()) {
-                    if (metastorage.readRaw(entry.getKey()) == null)
-                        metastorage.write(entry.getKey(), entry.getValue());
-                }
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException("Failed to read key from durable background tasks storage.", e);
-            }
+    /**
+     * Asynchronous execution of a durable background task.
+     *
+     * A new task will be added for execution either if there is no task with
+     * the same {@link DurableBackgroundTask#name name} or it (previous) will be completed.
+     *
+     * If the task is required to be completed after restarting the node,
+     * then it must be saved to the MetaStorage.
+     *
+     * If the task is saved to the Metastorage, then it will be deleted from it
+     * only after its completion and at the end of the checkpoint. Otherwise, it
+     * will be removed as soon as it is completed.
+     *
+     * @param task Durable background task.
+     * @param save Save task to MetaStorage.
+     * @return Futures that will complete when the task is completed.
+     */
+    public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask task, boolean save) {
+        DurableBackgroundTaskState taskState = tasks.compute(task.name(), (taskName, prev) -> {
+            if (prev != null && prev.state() != COMPLETED)
+                throw new IllegalArgumentException("Task is already present and has not been completed: " + taskName);
+
+            if (save)
+                toRmv.remove(taskName);
+
+            return new DurableBackgroundTaskState(task, new GridFutureAdapter<>(), save);
+        });
+
+        if (save) {
+            metaStorageOperation(metaStorage -> {
+                if (metaStorage != null)
+                    metaStorage.write(metaStorageKey(task), task);
+            });
         }
 
-        ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+        if (!prohibitionExecTasks)
+            executeAsync0(task);
 
-        this.metastorage = metastorage;
+        return taskState.outFuture();
     }
 
     /**
-     * Builds a metastorage key for durable background task object.
+     * Overloading the {@link #executeAsync(DurableBackgroundTask, boolean)}.
+     * If task is applied to persistent cache, saves it to MetaStorage.
      *
-     * @param obj Object.
-     * @return Metastorage key.
+     * @param t Durable background task.
+     * @param cacheCfg Cache configuration.
+     * @return Futures that will complete when the task is completed.
      */
-    private String durableBackgroundTaskMetastorageKey(DurableBackgroundTask obj) {
-        String k = STORE_DURABLE_BACKGROUND_TASK_PREFIX + obj.shortName();
-
-        if (k.length() > MetastorageTree.MAX_KEY_LEN) {
-            int hashLenLimit = 5;
-
-            String hash = String.valueOf(k.hashCode());
-
-            k = k.substring(0, MetastorageTree.MAX_KEY_LEN - hashLenLimit) +
-                (hash.length() > hashLenLimit ? hash.substring(0, hashLenLimit) : hash);
-        }
-
-        return k;
+    public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask t, CacheConfiguration cacheCfg) {
+        return executeAsync(t, CU.isPersistentCache(cacheCfg, ctx.config().getDataStorageConfiguration()));
     }
 
     /**
-     * Adds durable background task object.
+     * Asynchronous execution of a durable background task.
      *
-     * @param obj Object.
+     * @param t Durable background task.
      */
-    private void addDurableBackgroundTask(DurableBackgroundTask obj) {
-        String objName = durableBackgroundTaskMetastorageKey(obj);
+    private void executeAsync0(DurableBackgroundTask t) {
+        cancelLock.readLock().lock();
 
-        synchronized (metaStorageMux) {
-            durableBackgroundTasks.put(objName, obj);
-
-            if (metastorage != null) {
-                ctx.cache().context().database().checkpointReadLock();
-
-                try {
-                    metastorage.write(objName, obj);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
-                finally {
-                    ctx.cache().context().database().checkpointReadUnlock();
-                }
+        try {
+            DurableBackgroundTaskState taskState = tasks.get(t.name());
+
+            if (taskState != null && taskState.state(INIT, PREPARE)) {
+                if (log.isInfoEnabled())
+                    log.info("Executing durable background task: " + t.name());
+
+                t.executeAsync(ctx).listen(f -> {
+                    DurableBackgroundTaskResult res;
+
+                    try {
+                        res = f.get();
+
+                        assert res != null;
+                    }
+                    catch (Throwable e) {
+                        throw new AssertionError("Task completed with an error", e);

Review comment:
       Oof, don't we have something more appropriate?

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.localtask;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult.complete;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult.restart;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessor.metaStorageKey;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+
+/**
+ * Class for testing the {@link DurableBackgroundTasksProcessor}.
+ */
+public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractTest {

Review comment:
       Do we force checkpoints anywhere in this test?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
##########
@@ -98,12 +110,99 @@ public DurableBackgroundCleanupIndexTreeTask(
     }
 
     /** {@inheritDoc} */
-    @Override public String shortName() {
+    @Override public String name() {
         return "DROP_SQL_INDEX-" + schemaName + "." + idxName + "-" + id;
     }
 
     /** {@inheritDoc} */
-    @Override public void execute(GridKernalContext ctx) {
+    @Override public IgniteInternalFuture<DurableBackgroundTaskResult> executeAsync(GridKernalContext ctx) {
+        log = ctx.log(this.getClass());
+
+        assert worker == null;
+
+        GridFutureAdapter<DurableBackgroundTaskResult> fut = new GridFutureAdapter<>();
+
+        worker = new GridWorker(
+            ctx.igniteInstanceName(),
+            "async-durable-background-task-executor-" + name(),
+            log
+        ) {
+            /** {@inheritDoc} */
+            @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+                try {
+                    execute(ctx);
+
+                    worker = null;
+
+                    fut.onDone(DurableBackgroundTaskResult.complete(null));
+                }
+                catch (Throwable t) {
+                    worker = null;
+
+                    fut.onDone(DurableBackgroundTaskResult.restart(t));
+                }
+            }
+        };
+
+        new IgniteThread(worker).start();
+
+        return fut;
+    }
+
+    /**
+     * Checks that pageId is still relevant and has not been deleted / reused.
+     * @param grpId Cache group id.
+     * @param pageMem Page memory instance.
+     * @param rootPageId Root page identifier.
+     * @return {@code true} if root page was deleted/reused, {@code false} otherwise.
+     */
+    private boolean skipDeletedRoot(int grpId, PageMemory pageMem, long rootPageId) {
+        try {
+            long page = pageMem.acquirePage(grpId, rootPageId);
+
+            try {
+                long pageAddr = pageMem.readLock(grpId, rootPageId, page);
+
+                try {
+                    return pageAddr == 0;
+                }
+                finally {
+                    pageMem.readUnlock(grpId, rootPageId, page);

Review comment:
       Please add 0-check that I forgot to implement, there's a risk of getting **java.lang.IllegalMonitorStateException** :(

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
##########
@@ -16,315 +16,342 @@
  */
 package org.apache.ignite.internal.processors.localtask;
 
-import java.util.Map;
-import java.util.Set;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
-import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
 
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
 
 /**
- * Processor that is responsible for durable background tasks that are executed on local node
- * and should be continued even after node restart.
+ * Processor that is responsible for durable background tasks that are executed on local node.
  */
 public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener,
     CheckpointListener {
     /** Prefix for metastorage keys for durable background tasks. */
-    private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
+    private static final String TASK_PREFIX = "durable-background-task-";
 
-    /** Metastorage. */
-    private volatile ReadWriteMetastorage metastorage;
-
-    /** Metastorage synchronization mutex. */
+    /** MetaStorage synchronization mutex. */
     private final Object metaStorageMux = new Object();
 
-    /** Set of workers that executing durable background tasks. */
-    private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<>();
-
-    /** Count of workers that executing durable background tasks. */
-    private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
+    /** Current tasks. Mapping: {@link DurableBackgroundTask#name task name} -> task state. */
+    private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<>();
 
-    /** Durable background tasks map. */
-    private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
-
-    /** Set of started tasks' names. */
-    private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+    /** Lock for canceling tasks. */
+    private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
 
     /**
-     * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
-     *
-     *  @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+     * Tasks to be removed from the MetaStorage after the end of a checkpoint.
+     * Mapping: {@link DurableBackgroundTask#name task name} -> task.
      */
-    private volatile boolean forbidStartingNewTasks;
+    private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<>();
+
+    /** Prohibiting the execution of tasks. */
+    private volatile boolean prohibitionExecTasks = true;
 
     /**
+     * Constructor.
+     *
      * @param ctx Kernal context.
      */
     public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
         super(ctx);
     }
 
-    /**
-     * Starts the asynchronous operation of pending tasks execution. Is called on start.
-     */
-    private void asyncDurableBackgroundTasksExecution() {
-        assert durableBackgroundTasks != null;
-
-        for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
-            if (!task.isCompleted() && startedTasks.add(task.shortName()))
-                asyncDurableBackgroundTaskExecute(task);
-        }
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
     }
 
-    /**
-     * Creates a worker to execute single durable background task.
-     *
-     * @param task Task.
-     */
-    private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
-        String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        cancelTasks();
+    }
 
-        GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
-            @Override public void cancel() {
-                task.onCancel();
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+        metaStorageOperation(metaStorage -> {
+            assert metaStorage != null;
+
+            metaStorage.iterate(
+                TASK_PREFIX,
+                (k, v) -> {
+                    DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+                    tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+                },
+                true
+            );
+        });
+    }
 
-                super.cancel();
-            }
+    /** {@inheritDoc} */
+    @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
+        ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+    }
 
-            @Override protected void body() {
-                try {
-                    if (forbidStartingNewTasks)
-                        return;
+    /** {@inheritDoc} */
+    @Override public void beforeCheckpointBegin(Context ctx) {
+        /* No op. */
+    }
 
-                    log.info("Executing durable background task: " + task.shortName());
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTaskState>> it = tasks.entrySet().iterator(); it.hasNext(); ) {

Review comment:
       You can actually iterate through values collection and remove should still work!

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
##########
@@ -16,315 +16,342 @@
  */
 package org.apache.ignite.internal.processors.localtask;
 
-import java.util.Map;
-import java.util.Set;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
-import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
 
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
 
 /**
- * Processor that is responsible for durable background tasks that are executed on local node
- * and should be continued even after node restart.
+ * Processor that is responsible for durable background tasks that are executed on local node.
  */
 public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener,
     CheckpointListener {
     /** Prefix for metastorage keys for durable background tasks. */
-    private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
+    private static final String TASK_PREFIX = "durable-background-task-";
 
-    /** Metastorage. */
-    private volatile ReadWriteMetastorage metastorage;
-
-    /** Metastorage synchronization mutex. */
+    /** MetaStorage synchronization mutex. */
     private final Object metaStorageMux = new Object();
 
-    /** Set of workers that executing durable background tasks. */
-    private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<>();
-
-    /** Count of workers that executing durable background tasks. */
-    private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
+    /** Current tasks. Mapping: {@link DurableBackgroundTask#name task name} -> task state. */
+    private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<>();
 
-    /** Durable background tasks map. */
-    private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
-
-    /** Set of started tasks' names. */
-    private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+    /** Lock for canceling tasks. */
+    private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
 
     /**
-     * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
-     *
-     *  @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+     * Tasks to be removed from the MetaStorage after the end of a checkpoint.
+     * Mapping: {@link DurableBackgroundTask#name task name} -> task.
      */
-    private volatile boolean forbidStartingNewTasks;
+    private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<>();
+
+    /** Prohibiting the execution of tasks. */
+    private volatile boolean prohibitionExecTasks = true;
 
     /**
+     * Constructor.
+     *
      * @param ctx Kernal context.
      */
     public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
         super(ctx);
     }
 
-    /**
-     * Starts the asynchronous operation of pending tasks execution. Is called on start.
-     */
-    private void asyncDurableBackgroundTasksExecution() {
-        assert durableBackgroundTasks != null;
-
-        for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
-            if (!task.isCompleted() && startedTasks.add(task.shortName()))
-                asyncDurableBackgroundTaskExecute(task);
-        }
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
     }
 
-    /**
-     * Creates a worker to execute single durable background task.
-     *
-     * @param task Task.
-     */
-    private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
-        String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        cancelTasks();
+    }
 
-        GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
-            @Override public void cancel() {
-                task.onCancel();
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+        metaStorageOperation(metaStorage -> {
+            assert metaStorage != null;
+
+            metaStorage.iterate(
+                TASK_PREFIX,
+                (k, v) -> {
+                    DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+                    tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+                },
+                true
+            );
+        });
+    }
 
-                super.cancel();
-            }
+    /** {@inheritDoc} */
+    @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
+        ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+    }
 
-            @Override protected void body() {
-                try {
-                    if (forbidStartingNewTasks)
-                        return;
+    /** {@inheritDoc} */
+    @Override public void beforeCheckpointBegin(Context ctx) {
+        /* No op. */
+    }
 
-                    log.info("Executing durable background task: " + task.shortName());
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTaskState>> it = tasks.entrySet().iterator(); it.hasNext(); ) {
+            DurableBackgroundTaskState taskState = it.next().getValue();
 
-                    task.execute(ctx);
+            if (taskState.state() == COMPLETED) {
+                assert taskState.saved();
 
-                    task.complete();
+                DurableBackgroundTask t = taskState.task();
 
-                    log.info("Execution of durable background task completed: " + task.shortName());
-                }
-                catch (Throwable e) {
-                    log.error("Could not execute durable background task: " + task.shortName(), e);
-                }
-                finally {
-                    startedTasks.remove(task.shortName());
+                toRmv.put(t.name(), t);
 
-                    asyncDurableBackgroundTaskWorkers.remove(this);
-                }
+                it.remove();
             }
-        };
-
-        asyncDurableBackgroundTaskWorkers.add(worker);
-
-        Thread asyncTask = new IgniteThread(worker);
-
-        asyncTask.start();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart(boolean active) {
-        asyncDurableBackgroundTasksExecution();
+    @Override public void onCheckpointBegin(Context ctx) {
+        /* No op. */
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        forbidStartingNewTasks = true;
+    @Override public void afterCheckpointEnd(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTask>> it = toRmv.entrySet().iterator(); it.hasNext(); ) {

Review comment:
       Same here

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
##########
@@ -16,315 +16,342 @@
  */
 package org.apache.ignite.internal.processors.localtask;
 
-import java.util.Map;
-import java.util.Set;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
-import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
 
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
 
 /**
- * Processor that is responsible for durable background tasks that are executed on local node
- * and should be continued even after node restart.
+ * Processor that is responsible for durable background tasks that are executed on local node.
  */
 public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener,
     CheckpointListener {
     /** Prefix for metastorage keys for durable background tasks. */
-    private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
+    private static final String TASK_PREFIX = "durable-background-task-";
 
-    /** Metastorage. */
-    private volatile ReadWriteMetastorage metastorage;
-
-    /** Metastorage synchronization mutex. */
+    /** MetaStorage synchronization mutex. */
     private final Object metaStorageMux = new Object();
 
-    /** Set of workers that executing durable background tasks. */
-    private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<>();
-
-    /** Count of workers that executing durable background tasks. */
-    private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
+    /** Current tasks. Mapping: {@link DurableBackgroundTask#name task name} -> task state. */
+    private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<>();
 
-    /** Durable background tasks map. */
-    private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
-
-    /** Set of started tasks' names. */
-    private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+    /** Lock for canceling tasks. */
+    private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
 
     /**
-     * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
-     *
-     *  @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+     * Tasks to be removed from the MetaStorage after the end of a checkpoint.
+     * Mapping: {@link DurableBackgroundTask#name task name} -> task.
      */
-    private volatile boolean forbidStartingNewTasks;
+    private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<>();
+
+    /** Prohibiting the execution of tasks. */
+    private volatile boolean prohibitionExecTasks = true;
 
     /**
+     * Constructor.
+     *
      * @param ctx Kernal context.
      */
     public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
         super(ctx);
     }
 
-    /**
-     * Starts the asynchronous operation of pending tasks execution. Is called on start.
-     */
-    private void asyncDurableBackgroundTasksExecution() {
-        assert durableBackgroundTasks != null;
-
-        for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
-            if (!task.isCompleted() && startedTasks.add(task.shortName()))
-                asyncDurableBackgroundTaskExecute(task);
-        }
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
     }
 
-    /**
-     * Creates a worker to execute single durable background task.
-     *
-     * @param task Task.
-     */
-    private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
-        String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        cancelTasks();
+    }
 
-        GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
-            @Override public void cancel() {
-                task.onCancel();
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+        metaStorageOperation(metaStorage -> {
+            assert metaStorage != null;
+
+            metaStorage.iterate(
+                TASK_PREFIX,
+                (k, v) -> {
+                    DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+                    tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+                },
+                true
+            );
+        });
+    }
 
-                super.cancel();
-            }
+    /** {@inheritDoc} */
+    @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
+        ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+    }
 
-            @Override protected void body() {
-                try {
-                    if (forbidStartingNewTasks)
-                        return;
+    /** {@inheritDoc} */
+    @Override public void beforeCheckpointBegin(Context ctx) {
+        /* No op. */
+    }
 
-                    log.info("Executing durable background task: " + task.shortName());
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTaskState>> it = tasks.entrySet().iterator(); it.hasNext(); ) {
+            DurableBackgroundTaskState taskState = it.next().getValue();
 
-                    task.execute(ctx);
+            if (taskState.state() == COMPLETED) {
+                assert taskState.saved();
 
-                    task.complete();
+                DurableBackgroundTask t = taskState.task();
 
-                    log.info("Execution of durable background task completed: " + task.shortName());
-                }
-                catch (Throwable e) {
-                    log.error("Could not execute durable background task: " + task.shortName(), e);
-                }
-                finally {
-                    startedTasks.remove(task.shortName());
+                toRmv.put(t.name(), t);
 
-                    asyncDurableBackgroundTaskWorkers.remove(this);
-                }
+                it.remove();
             }
-        };
-
-        asyncDurableBackgroundTaskWorkers.add(worker);
-
-        Thread asyncTask = new IgniteThread(worker);
-
-        asyncTask.start();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart(boolean active) {
-        asyncDurableBackgroundTasksExecution();
+    @Override public void onCheckpointBegin(Context ctx) {
+        /* No op. */
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        forbidStartingNewTasks = true;
+    @Override public void afterCheckpointEnd(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTask>> it = toRmv.entrySet().iterator(); it.hasNext(); ) {
+            DurableBackgroundTask t = it.next().getValue();
 
-        awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
-    }
+            metaStorageOperation(metaStorage -> {
+                if (metaStorage != null && toRmv.containsKey(t.name()))

Review comment:
       What's the point of **toRmv.containsKey(t.name())** check?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
##########
@@ -16,315 +16,342 @@
  */
 package org.apache.ignite.internal.processors.localtask;
 
-import java.util.Map;
-import java.util.Set;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
-import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
 
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
 
 /**
- * Processor that is responsible for durable background tasks that are executed on local node
- * and should be continued even after node restart.
+ * Processor that is responsible for durable background tasks that are executed on local node.
  */
 public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener,
     CheckpointListener {
     /** Prefix for metastorage keys for durable background tasks. */
-    private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
+    private static final String TASK_PREFIX = "durable-background-task-";
 
-    /** Metastorage. */
-    private volatile ReadWriteMetastorage metastorage;
-
-    /** Metastorage synchronization mutex. */
+    /** MetaStorage synchronization mutex. */
     private final Object metaStorageMux = new Object();
 
-    /** Set of workers that executing durable background tasks. */
-    private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<>();
-
-    /** Count of workers that executing durable background tasks. */
-    private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
+    /** Current tasks. Mapping: {@link DurableBackgroundTask#name task name} -> task state. */
+    private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<>();
 
-    /** Durable background tasks map. */
-    private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
-
-    /** Set of started tasks' names. */
-    private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+    /** Lock for canceling tasks. */
+    private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
 
     /**
-     * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
-     *
-     *  @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+     * Tasks to be removed from the MetaStorage after the end of a checkpoint.
+     * Mapping: {@link DurableBackgroundTask#name task name} -> task.
      */
-    private volatile boolean forbidStartingNewTasks;
+    private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<>();
+
+    /** Prohibiting the execution of tasks. */
+    private volatile boolean prohibitionExecTasks = true;
 
     /**
+     * Constructor.
+     *
      * @param ctx Kernal context.
      */
     public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
         super(ctx);
     }
 
-    /**
-     * Starts the asynchronous operation of pending tasks execution. Is called on start.
-     */
-    private void asyncDurableBackgroundTasksExecution() {
-        assert durableBackgroundTasks != null;
-
-        for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
-            if (!task.isCompleted() && startedTasks.add(task.shortName()))
-                asyncDurableBackgroundTaskExecute(task);
-        }
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
     }
 
-    /**
-     * Creates a worker to execute single durable background task.
-     *
-     * @param task Task.
-     */
-    private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
-        String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        cancelTasks();
+    }
 
-        GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
-            @Override public void cancel() {
-                task.onCancel();
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+        metaStorageOperation(metaStorage -> {
+            assert metaStorage != null;
+
+            metaStorage.iterate(
+                TASK_PREFIX,
+                (k, v) -> {
+                    DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+                    tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+                },
+                true
+            );
+        });
+    }
 
-                super.cancel();
-            }
+    /** {@inheritDoc} */
+    @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
+        ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+    }
 
-            @Override protected void body() {
-                try {
-                    if (forbidStartingNewTasks)
-                        return;
+    /** {@inheritDoc} */
+    @Override public void beforeCheckpointBegin(Context ctx) {
+        /* No op. */
+    }
 
-                    log.info("Executing durable background task: " + task.shortName());
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTaskState>> it = tasks.entrySet().iterator(); it.hasNext(); ) {
+            DurableBackgroundTaskState taskState = it.next().getValue();
 
-                    task.execute(ctx);
+            if (taskState.state() == COMPLETED) {
+                assert taskState.saved();
 
-                    task.complete();
+                DurableBackgroundTask t = taskState.task();
 
-                    log.info("Execution of durable background task completed: " + task.shortName());
-                }
-                catch (Throwable e) {
-                    log.error("Could not execute durable background task: " + task.shortName(), e);
-                }
-                finally {
-                    startedTasks.remove(task.shortName());
+                toRmv.put(t.name(), t);
 
-                    asyncDurableBackgroundTaskWorkers.remove(this);
-                }
+                it.remove();
             }
-        };
-
-        asyncDurableBackgroundTaskWorkers.add(worker);
-
-        Thread asyncTask = new IgniteThread(worker);
-
-        asyncTask.start();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart(boolean active) {
-        asyncDurableBackgroundTasksExecution();
+    @Override public void onCheckpointBegin(Context ctx) {
+        /* No op. */
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        forbidStartingNewTasks = true;
+    @Override public void afterCheckpointEnd(Context ctx) {
+        for (Iterator<Entry<String, DurableBackgroundTask>> it = toRmv.entrySet().iterator(); it.hasNext(); ) {
+            DurableBackgroundTask t = it.next().getValue();
 
-        awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
-    }
+            metaStorageOperation(metaStorage -> {
+                if (metaStorage != null && toRmv.containsKey(t.name()))
+                    metaStorage.remove(metaStorageKey(t));
+            });
 
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+            it.remove();
+        }
     }
 
     /**
-     * @param msg Message.
+     * Callback at the start of a global state change.
+     *
+     * @param msg Message for change cluster global state.
      */
-    public void onStateChange(ChangeGlobalStateMessage msg) {
-        if (msg.state() == ClusterState.INACTIVE) {
-            forbidStartingNewTasks = true;
-
-            awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
-        }
+    public void onStateChangeStarted(ChangeGlobalStateMessage msg) {
+        if (msg.state() == ClusterState.INACTIVE)
+            cancelTasks();
     }
 
     /**
-     * @param msg Message.
+     * Callback on finish of a global state change.
+     *
+     * @param msg Finish message for change cluster global state.
      */
     public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
         if (msg.state() != ClusterState.INACTIVE) {
-            forbidStartingNewTasks = false;
-
-            asyncDurableBackgroundTasksExecution();
-        }
-    }
+            prohibitionExecTasks = false;
 
-    /** {@inheritDoc} */
-    @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
-        synchronized (metaStorageMux) {
-            if (durableBackgroundTasks.isEmpty()) {
-                try {
-                    metastorage.iterate(
-                        STORE_DURABLE_BACKGROUND_TASK_PREFIX,
-                        (key, val) -> durableBackgroundTasks.put(key, (DurableBackgroundTask)val),
-                        true
-                    );
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException("Failed to iterate durable background tasks storage.", e);
-                }
+            for (DurableBackgroundTaskState taskState : tasks.values()) {
+                if (!prohibitionExecTasks)
+                    executeAsync0(taskState.task());
             }
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
-        synchronized (metaStorageMux) {
-            try {
-                for (Map.Entry<String, DurableBackgroundTask> entry : durableBackgroundTasks.entrySet()) {
-                    if (metastorage.readRaw(entry.getKey()) == null)
-                        metastorage.write(entry.getKey(), entry.getValue());
-                }
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException("Failed to read key from durable background tasks storage.", e);
-            }
+    /**
+     * Asynchronous execution of a durable background task.
+     *
+     * A new task will be added for execution either if there is no task with
+     * the same {@link DurableBackgroundTask#name name} or it (previous) will be completed.
+     *
+     * If the task is required to be completed after restarting the node,
+     * then it must be saved to the MetaStorage.
+     *
+     * If the task is saved to the Metastorage, then it will be deleted from it
+     * only after its completion and at the end of the checkpoint. Otherwise, it
+     * will be removed as soon as it is completed.
+     *
+     * @param task Durable background task.
+     * @param save Save task to MetaStorage.
+     * @return Futures that will complete when the task is completed.
+     */
+    public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask task, boolean save) {
+        DurableBackgroundTaskState taskState = tasks.compute(task.name(), (taskName, prev) -> {
+            if (prev != null && prev.state() != COMPLETED)
+                throw new IllegalArgumentException("Task is already present and has not been completed: " + taskName);
+
+            if (save)
+                toRmv.remove(taskName);
+
+            return new DurableBackgroundTaskState(task, new GridFutureAdapter<>(), save);
+        });
+
+        if (save) {
+            metaStorageOperation(metaStorage -> {
+                if (metaStorage != null)
+                    metaStorage.write(metaStorageKey(task), task);
+            });
         }
 
-        ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+        if (!prohibitionExecTasks)
+            executeAsync0(task);
 
-        this.metastorage = metastorage;
+        return taskState.outFuture();
     }
 
     /**
-     * Builds a metastorage key for durable background task object.
+     * Overloading the {@link #executeAsync(DurableBackgroundTask, boolean)}.
+     * If task is applied to persistent cache, saves it to MetaStorage.
      *
-     * @param obj Object.
-     * @return Metastorage key.
+     * @param t Durable background task.
+     * @param cacheCfg Cache configuration.
+     * @return Futures that will complete when the task is completed.
      */
-    private String durableBackgroundTaskMetastorageKey(DurableBackgroundTask obj) {
-        String k = STORE_DURABLE_BACKGROUND_TASK_PREFIX + obj.shortName();
-
-        if (k.length() > MetastorageTree.MAX_KEY_LEN) {
-            int hashLenLimit = 5;
-
-            String hash = String.valueOf(k.hashCode());
-
-            k = k.substring(0, MetastorageTree.MAX_KEY_LEN - hashLenLimit) +
-                (hash.length() > hashLenLimit ? hash.substring(0, hashLenLimit) : hash);
-        }
-
-        return k;
+    public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask t, CacheConfiguration cacheCfg) {
+        return executeAsync(t, CU.isPersistentCache(cacheCfg, ctx.config().getDataStorageConfiguration()));
     }
 
     /**
-     * Adds durable background task object.
+     * Asynchronous execution of a durable background task.
      *
-     * @param obj Object.
+     * @param t Durable background task.
      */
-    private void addDurableBackgroundTask(DurableBackgroundTask obj) {
-        String objName = durableBackgroundTaskMetastorageKey(obj);
+    private void executeAsync0(DurableBackgroundTask t) {
+        cancelLock.readLock().lock();
 
-        synchronized (metaStorageMux) {
-            durableBackgroundTasks.put(objName, obj);
-
-            if (metastorage != null) {
-                ctx.cache().context().database().checkpointReadLock();
-
-                try {
-                    metastorage.write(objName, obj);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
-                finally {
-                    ctx.cache().context().database().checkpointReadUnlock();
-                }
+        try {
+            DurableBackgroundTaskState taskState = tasks.get(t.name());
+
+            if (taskState != null && taskState.state(INIT, PREPARE)) {
+                if (log.isInfoEnabled())
+                    log.info("Executing durable background task: " + t.name());
+
+                t.executeAsync(ctx).listen(f -> {
+                    DurableBackgroundTaskResult res;
+
+                    try {
+                        res = f.get();
+
+                        assert res != null;
+                    }
+                    catch (Throwable e) {
+                        throw new AssertionError("Task completed with an error", e);
+                    }
+
+                    if (res.error() != null)
+                        log.error("Could not execute durable background task: " + t.name(), res.error());
+
+                    if (res.completed()) {
+                        if (res.error() == null && log.isInfoEnabled())
+                            log.info("Execution of durable background task completed: " + t.name());
+
+                        if (taskState.saved())
+                            taskState.state(COMPLETED);
+                        else
+                            tasks.remove(t.name());
+
+                        GridFutureAdapter<Void> outFut = taskState.outFuture();
+
+                        if (outFut != null)
+                            outFut.onDone(res.error());
+                    }
+                    else if (res.restart()) {
+                        if (log.isInfoEnabled())
+                            log.info("Execution of durable background task will be restarted: " + t.name());
+
+                        taskState.state(INIT);
+                    }
+                    else
+                        throw new AssertionError("Not supported: " + res);

Review comment:
       Previous else can be unconditional with **assert res.restart()** in it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org