You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/05/12 06:55:37 UTC

[ignite] branch master updated: IGNITE-14684 Fixed node fail due to deleting DurableBackgroundTask's at the end of a checkpoint when stopping a node (#9091)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e2fa709  IGNITE-14684 Fixed node fail due to deleting DurableBackgroundTask's at the end of a checkpoint when stopping a node (#9091)
e2fa709 is described below

commit e2fa7098350a0e1b078dbf9e4e7f46ec96510411
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed May 12 09:55:08 2021 +0300

    IGNITE-14684 Fixed node fail due to deleting DurableBackgroundTask's at the end of a checkpoint when stopping a node (#9091)
---
 .../apache/ignite/internal/GridKernalContext.java  |   2 +-
 .../ignite/internal/GridKernalContextImpl.java     |   2 +-
 .../query/index/sorted/inline/InlineIndexImpl.java |   2 +-
 .../persistence/checkpoint/CheckpointWorkflow.java |   3 +-
 .../wal/reader/StandaloneGridKernalContext.java    |   2 +-
 .../cluster/GridClusterStateProcessor.java         |   4 +-
 .../localtask/DurableBackgroundTasksProcessor.java | 103 ++++++++++------
 .../DurableBackgroundTasksProcessorSelfTest.java   | 132 ++++++++++++++++++++-
 .../localtask/ObservingCheckpointListener.java     |  16 ++-
 9 files changed, 213 insertions(+), 53 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index f73725f..a80c3eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -762,7 +762,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
     /**
      * @return Local continuous tasks processor.
      */
-    public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor();
+    public DurableBackgroundTasksProcessor durableBackgroundTask();
 
     /**
      * Return Thread pool for create/rebuild indexes.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 3fdc5f4..4c5d273 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -1311,7 +1311,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor() {
+    @Override public DurableBackgroundTasksProcessor durableBackgroundTask() {
         return durableBackgroundTasksProcessor;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
index 841ef0d..3a91a34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
@@ -449,7 +449,7 @@ public class InlineIndexImpl extends AbstractIndex implements InlineIndex {
                     treeName
                 );
 
-                cctx.kernalContext().durableBackgroundTasksProcessor().executeAsync(task, cctx.config());
+                cctx.kernalContext().durableBackgroundTask().executeAsync(task, cctx.config());
             }
         }
         catch (IgniteCheckedException e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
index 21fcc00..e383b0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
@@ -74,7 +74,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.WorkProgressDispatcher;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedHashMap;
 
@@ -608,7 +607,7 @@ public class CheckpointWorkflow {
      * @param checkpointedRegions Regions which will be checkpointed.
      * @return Checkpoint listeners which should be handled.
      */
-    @NotNull private List<CheckpointListener> getRelevantCheckpointListeners(Collection<DataRegion> checkpointedRegions) {
+    public List<CheckpointListener> getRelevantCheckpointListeners(Collection<DataRegion> checkpointedRegions) {
         return lsnrs.entrySet().stream()
             .filter(entry -> entry.getValue() == NO_REGION || checkpointedRegions.contains(entry.getValue()))
             .map(Map.Entry::getKey)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 0178db4..2951f1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -728,7 +728,7 @@ public class StandaloneGridKernalContext implements GridKernalContext {
     }
 
     /** {@inheritDoc} */
-    @Override public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor() {
+    @Override public DurableBackgroundTasksProcessor durableBackgroundTask() {
         return null;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index a3d1fdd..b7ff732 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -588,7 +588,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
 
             ctx.cache().onStateChangeFinish(msg);
 
-            ctx.durableBackgroundTasksProcessor().onStateChangeFinish(msg);
+            ctx.durableBackgroundTask().onStateChangeFinish(msg);
 
             if (discoClusterState.lastState() == ACTIVE_READ_ONLY || globalState.state() == ACTIVE_READ_ONLY)
                 ctx.cache().context().readOnlyMode(globalState.state() == ACTIVE_READ_ONLY);
@@ -729,7 +729,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
                     nodeIds
                 );
 
-                ctx.durableBackgroundTasksProcessor().onStateChangeStarted(msg);
+                ctx.durableBackgroundTask().onStateChangeStarted(msg);
 
                 if (msg.forceChangeBaselineTopology())
                     newState.setTransitionResult(msg.requestId(), msg.state());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
index 9fb998f..8cee313 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendi
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.GridBusyLock;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -74,6 +75,9 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
     /** Prohibiting the execution of tasks. */
     private volatile boolean prohibitionExecTasks = true;
 
+    /** Node stop lock. */
+    private final GridBusyLock stopLock = new GridBusyLock();
+
     /**
      * Constructor.
      *
@@ -91,23 +95,33 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
     /** {@inheritDoc} */
     @Override public void onKernalStop(boolean cancel) {
         cancelTasks();
+
+        stopLock.block();
     }
 
     /** {@inheritDoc} */
     @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
-        metaStorageOperation(metaStorage -> {
-            assert metaStorage != null;
-
-            metaStorage.iterate(
-                TASK_PREFIX,
-                (k, v) -> {
-                    DurableBackgroundTask t = (DurableBackgroundTask)v;
-
-                    tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
-                },
-                true
-            );
-        });
+        if (!stopLock.enterBusy())
+            return;
+
+        try {
+            metaStorageOperation(metaStorage -> {
+                assert metaStorage != null;
+
+                metaStorage.iterate(
+                    TASK_PREFIX,
+                    (k, v) -> {
+                        DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+                        tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+                    },
+                    true
+                );
+            });
+        }
+        finally {
+            stopLock.leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
@@ -144,15 +158,25 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
 
     /** {@inheritDoc} */
     @Override public void afterCheckpointEnd(Context ctx) {
-        for (Iterator<DurableBackgroundTask> it = toRmv.values().iterator(); it.hasNext(); ) {
-            DurableBackgroundTask t = it.next();
+        if (!stopLock.enterBusy())
+            return;
 
-            metaStorageOperation(metaStorage -> {
-                if (metaStorage != null && toRmv.containsKey(t.name()))
-                    metaStorage.remove(metaStorageKey(t));
-            });
+        try {
+            for (Iterator<DurableBackgroundTask> it = toRmv.values().iterator(); it.hasNext(); ) {
+                DurableBackgroundTask t = it.next();
+
+                metaStorageOperation(metaStorage -> {
+                    if (metaStorage != null) {
+                        if (!tasks.containsKey(t.name()))
+                            metaStorage.remove(metaStorageKey(t));
 
-            it.remove();
+                        it.remove();
+                    }
+                });
+            }
+        }
+        finally {
+            stopLock.leaveBusy();
         }
     }
 
@@ -200,27 +224,34 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
      * @return Futures that will complete when the task is completed.
      */
     public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask task, boolean save) {
-        DurableBackgroundTaskState taskState = tasks.compute(task.name(), (taskName, prev) -> {
-            if (prev != null && prev.state() != COMPLETED)
-                throw new IllegalArgumentException("Task is already present and has not been completed: " + taskName);
-
-            if (save)
-                toRmv.remove(taskName);
+        if (!stopLock.enterBusy())
+            throw new IgniteException("Node is stopping.");
 
-            return new DurableBackgroundTaskState(task, new GridFutureAdapter<>(), save);
-        });
+        try {
+            DurableBackgroundTaskState taskState = tasks.compute(task.name(), (taskName, prev) -> {
+                if (prev != null && prev.state() != COMPLETED) {
+                    throw new IllegalArgumentException("Task is already present and has not been completed: " +
+                        taskName);
+                }
 
-        if (save) {
-            metaStorageOperation(metaStorage -> {
-                if (metaStorage != null)
-                    metaStorage.write(metaStorageKey(task), task);
+                return new DurableBackgroundTaskState(task, new GridFutureAdapter<>(), save);
             });
-        }
 
-        if (!prohibitionExecTasks)
-            executeAsync0(task);
+            if (save) {
+                metaStorageOperation(metaStorage -> {
+                    if (metaStorage != null)
+                        metaStorage.write(metaStorageKey(task), task);
+                });
+            }
+
+            if (!prohibitionExecTasks)
+                executeAsync0(task);
 
-        return taskState.outFuture();
+            return taskState.outFuture();
+        }
+        finally {
+            stopLock.leaveBusy();
+        }
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
index ba0da72..8b389b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.ignite.internal.processors.localtask;
 
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -26,8 +29,11 @@ import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointWorkflow;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
 import org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
@@ -45,6 +51,7 @@ import static org.apache.ignite.internal.processors.localtask.DurableBackgroundT
 import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessor.metaStorageKey;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 
 /**
  * Class for testing the {@link DurableBackgroundTasksProcessor}.
@@ -171,6 +178,111 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
     }
 
     /**
+     * Checks that stopping the node does not fail the node when deleting tasks.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNotFailNodeWhenNodeStoppindAndDeleteTasks() throws Exception {
+        IgniteEx n = startGrid(0);
+
+        ObservingCheckpointListener observingCpLsnr = new ObservingCheckpointListener();
+        dbMgr(n).addCheckpointListener(observingCpLsnr);
+
+        n.cluster().state(ACTIVE);
+
+        CheckpointWorkflow cpWorkflow = checkpointWorkflow(n);
+        List<CheckpointListener> cpLs = cpWorkflow.getRelevantCheckpointListeners(dbMgr(n).checkpointedDataRegions());
+
+        assertTrue(cpLs.contains(observingCpLsnr));
+        assertTrue(cpLs.contains(durableBackgroundTask(n)));
+        assertTrue(cpLs.indexOf(observingCpLsnr) < cpLs.indexOf(durableBackgroundTask(n)));
+
+        SimpleTask simpleTask = new SimpleTask("t");
+        IgniteInternalFuture<Void> taskFut = durableBackgroundTask(n).executeAsync(simpleTask, true);
+
+        simpleTask.onExecFut.get(getTestTimeout());
+
+        GridFutureAdapter<Void> startStopFut = new GridFutureAdapter<>();
+        GridFutureAdapter<Void> finishStopFut = new GridFutureAdapter<>();
+
+        observingCpLsnr.repeatOnMarkCheckpointBeginConsumer = true;
+        observingCpLsnr.onMarkCheckpointBeginConsumer = ctx -> {
+            if (n.context().isStopping()) {
+                startStopFut.onDone();
+
+                finishStopFut.get(getTestTimeout());
+
+                observingCpLsnr.repeatOnMarkCheckpointBeginConsumer = false;
+            }
+        };
+
+        IgniteInternalFuture<Void> stopFut = runAsync(() -> stopAllGrids(false));
+
+        startStopFut.get(getTestTimeout());
+
+        simpleTask.taskFut.onDone(DurableBackgroundTaskResult.complete(null));
+        taskFut.get(getTestTimeout());
+
+        finishStopFut.onDone();
+
+        stopFut.get(getTestTimeout());
+        assertNull(n.context().failure().failureContext());
+
+        assertThrows(log, () -> durableBackgroundTask(n).executeAsync(simpleTask, true), IgniteException.class, null);
+    }
+
+    /**
+     * Check that the task will not be deleted from the MetaStorage if it was restarted.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDontDeleteTaskIfItsRestart() throws Exception {
+        IgniteEx n = startGrid(0);
+
+        ObservingCheckpointListener observingCpLsnr = new ObservingCheckpointListener();
+        dbMgr(n).addCheckpointListener(observingCpLsnr);
+
+        n.cluster().state(ACTIVE);
+
+        CheckpointWorkflow cpWorkflow = checkpointWorkflow(n);
+        List<CheckpointListener> cpLs = cpWorkflow.getRelevantCheckpointListeners(dbMgr(n).checkpointedDataRegions());
+
+        assertTrue(cpLs.contains(observingCpLsnr));
+        assertTrue(cpLs.contains(durableBackgroundTask(n)));
+        assertTrue(cpLs.indexOf(observingCpLsnr) < cpLs.indexOf(durableBackgroundTask(n)));
+
+        SimpleTask simpleTask0 = new SimpleTask("t");
+        IgniteInternalFuture<Void> taskFut = durableBackgroundTask(n).executeAsync(simpleTask0, true);
+
+        simpleTask0.onExecFut.get(getTestTimeout());
+
+        dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+        simpleTask0.taskFut.onDone(DurableBackgroundTaskResult.complete(null));
+        taskFut.get(getTestTimeout());
+
+        SimpleTask simpleTask1 = new SimpleTask("t");
+        AtomicReference<IgniteInternalFuture<Void>> taskFutRef = new AtomicReference<>();
+
+        observingCpLsnr.afterCheckpointEndConsumer =
+            ctx -> taskFutRef.set(durableBackgroundTask(n).executeAsync(simpleTask1, true));
+
+        dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
+        forceCheckpoint();
+
+        assertNotNull(metaStorageOperation(n, ms -> ms.read(metaStorageKey(simpleTask0))));
+
+        simpleTask1.onExecFut.get(getTestTimeout());
+        simpleTask1.taskFut.onDone(DurableBackgroundTaskResult.complete(null));
+        taskFutRef.get().get(getTestTimeout());
+
+        forceCheckpoint();
+        assertNull(metaStorageOperation(n, ms -> ms.read(metaStorageKey(simpleTask1))));
+    }
+
+    /**
      * Check that the task will be restarted correctly.
      *
      * @param save Save to MetaStorage.
@@ -342,7 +454,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
      * @return Task future.
      */
     private IgniteInternalFuture<Void> execAsync(IgniteEx n, DurableBackgroundTask t, boolean save) {
-        return durableBackgroundTasksProcessor(n).executeAsync(t, save);
+        return durableBackgroundTask(n).executeAsync(t, save);
     }
 
     /**
@@ -352,7 +464,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
      * @return Map of tasks that will be removed from the MetaStorage.
      */
     private Map<String, DurableBackgroundTask> toRmv(IgniteEx n) {
-        return getFieldValue(durableBackgroundTasksProcessor(n), "toRmv");
+        return getFieldValue(durableBackgroundTask(n), "toRmv");
     }
 
     /**
@@ -362,7 +474,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
      * @return Task states map.
      */
     private Map<String, DurableBackgroundTaskState> tasks(IgniteEx n) {
-        return getFieldValue(durableBackgroundTasksProcessor(n), "tasks");
+        return getFieldValue(durableBackgroundTask(n), "tasks");
     }
 
     /**
@@ -371,8 +483,8 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
      * @param n Node.
      * @return Durable background task processor.
      */
-    private DurableBackgroundTasksProcessor durableBackgroundTasksProcessor(IgniteEx n) {
-        return n.context().durableBackgroundTasksProcessor();
+    private DurableBackgroundTasksProcessor durableBackgroundTask(IgniteEx n) {
+        return n.context().durableBackgroundTask();
     }
 
     /**
@@ -398,4 +510,14 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
             dbMgr.checkpointReadUnlock();
         }
     }
+
+    /**
+     * Getting {@code CheckpointManager#checkpointWorkflow}.
+     *
+     * @param n Node.
+     * @return Checkpoint workflow.
+     */
+    private CheckpointWorkflow checkpointWorkflow(IgniteEx n) {
+        return getFieldValue(dbMgr(n), "checkpointManager", "checkpointWorkflow");
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java
index b86ed09..b7b1bbd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java
@@ -30,17 +30,24 @@ public class ObservingCheckpointListener implements CheckpointListener {
     /** On mark checkpoint begin consumer. */
     volatile IgniteThrowableConsumer<Context> onMarkCheckpointBeginConsumer;
 
+    /** Repeat {@link #onMarkCheckpointBeginConsumer}. */
+    volatile boolean repeatOnMarkCheckpointBeginConsumer;
+
     /** After checkpoint end consumer. */
     volatile IgniteThrowableConsumer<Context> afterCheckpointEndConsumer;
 
+    /** Repeat {@link #afterCheckpointEndConsumer}. */
+    volatile boolean repeatAfterCheckpointEndConsumer;
+
     /** {@inheritDoc} */
     @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
         IgniteThrowableConsumer<Context> consumer = onMarkCheckpointBeginConsumer;
 
         if (consumer != null) {
-            onMarkCheckpointBeginConsumer = null;
-
             consumer.accept(ctx);
+
+            if (!repeatOnMarkCheckpointBeginConsumer)
+                onMarkCheckpointBeginConsumer = null;
         }
     }
 
@@ -59,9 +66,10 @@ public class ObservingCheckpointListener implements CheckpointListener {
         IgniteThrowableConsumer<Context> consumer = afterCheckpointEndConsumer;
 
         if (consumer != null) {
-            afterCheckpointEndConsumer = null;
-
             consumer.accept(ctx);
+
+            if (!repeatAfterCheckpointEndConsumer)
+                afterCheckpointEndConsumer = null;
         }
     }