You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/05/12 06:55:37 UTC
[ignite] branch master updated: IGNITE-14684 Fixed node fail due to
deleting DurableBackgroundTask's at the end of a checkpoint when stopping a
node (#9091)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e2fa709 IGNITE-14684 Fixed node fail due to deleting DurableBackgroundTask's at the end of a checkpoint when stopping a node (#9091)
e2fa709 is described below
commit e2fa7098350a0e1b078dbf9e4e7f46ec96510411
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed May 12 09:55:08 2021 +0300
IGNITE-14684 Fixed node fail due to deleting DurableBackgroundTask's at the end of a checkpoint when stopping a node (#9091)
---
.../apache/ignite/internal/GridKernalContext.java | 2 +-
.../ignite/internal/GridKernalContextImpl.java | 2 +-
.../query/index/sorted/inline/InlineIndexImpl.java | 2 +-
.../persistence/checkpoint/CheckpointWorkflow.java | 3 +-
.../wal/reader/StandaloneGridKernalContext.java | 2 +-
.../cluster/GridClusterStateProcessor.java | 4 +-
.../localtask/DurableBackgroundTasksProcessor.java | 103 ++++++++++------
.../DurableBackgroundTasksProcessorSelfTest.java | 132 ++++++++++++++++++++-
.../localtask/ObservingCheckpointListener.java | 16 ++-
9 files changed, 213 insertions(+), 53 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index f73725f..a80c3eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -762,7 +762,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
/**
* @return Local continuous tasks processor.
*/
- public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor();
+ public DurableBackgroundTasksProcessor durableBackgroundTask();
/**
* Return Thread pool for create/rebuild indexes.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 3fdc5f4..4c5d273 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -1311,7 +1311,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
- @Override public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor() {
+ @Override public DurableBackgroundTasksProcessor durableBackgroundTask() {
return durableBackgroundTasksProcessor;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
index 841ef0d..3a91a34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
@@ -449,7 +449,7 @@ public class InlineIndexImpl extends AbstractIndex implements InlineIndex {
treeName
);
- cctx.kernalContext().durableBackgroundTasksProcessor().executeAsync(task, cctx.config());
+ cctx.kernalContext().durableBackgroundTask().executeAsync(task, cctx.config());
}
}
catch (IgniteCheckedException e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
index 21fcc00..e383b0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
@@ -74,7 +74,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.WorkProgressDispatcher;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
@@ -608,7 +607,7 @@ public class CheckpointWorkflow {
* @param checkpointedRegions Regions which will be checkpointed.
* @return Checkpoint listeners which should be handled.
*/
- @NotNull private List<CheckpointListener> getRelevantCheckpointListeners(Collection<DataRegion> checkpointedRegions) {
+ public List<CheckpointListener> getRelevantCheckpointListeners(Collection<DataRegion> checkpointedRegions) {
return lsnrs.entrySet().stream()
.filter(entry -> entry.getValue() == NO_REGION || checkpointedRegions.contains(entry.getValue()))
.map(Map.Entry::getKey)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 0178db4..2951f1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -728,7 +728,7 @@ public class StandaloneGridKernalContext implements GridKernalContext {
}
/** {@inheritDoc} */
- @Override public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor() {
+ @Override public DurableBackgroundTasksProcessor durableBackgroundTask() {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index a3d1fdd..b7ff732 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -588,7 +588,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
ctx.cache().onStateChangeFinish(msg);
- ctx.durableBackgroundTasksProcessor().onStateChangeFinish(msg);
+ ctx.durableBackgroundTask().onStateChangeFinish(msg);
if (discoClusterState.lastState() == ACTIVE_READ_ONLY || globalState.state() == ACTIVE_READ_ONLY)
ctx.cache().context().readOnlyMode(globalState.state() == ACTIVE_READ_ONLY);
@@ -729,7 +729,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
nodeIds
);
- ctx.durableBackgroundTasksProcessor().onStateChangeStarted(msg);
+ ctx.durableBackgroundTask().onStateChangeStarted(msg);
if (msg.forceChangeBaselineTopology())
newState.setTransitionResult(msg.requestId(), msg.state());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
index 9fb998f..8cee313 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendi
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -74,6 +75,9 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
/** Prohibiting the execution of tasks. */
private volatile boolean prohibitionExecTasks = true;
+ /** Node stop lock. */
+ private final GridBusyLock stopLock = new GridBusyLock();
+
/**
* Constructor.
*
@@ -91,23 +95,33 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
cancelTasks();
+
+ stopLock.block();
}
/** {@inheritDoc} */
@Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
- metaStorageOperation(metaStorage -> {
- assert metaStorage != null;
-
- metaStorage.iterate(
- TASK_PREFIX,
- (k, v) -> {
- DurableBackgroundTask t = (DurableBackgroundTask)v;
-
- tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
- },
- true
- );
- });
+ if (!stopLock.enterBusy())
+ return;
+
+ try {
+ metaStorageOperation(metaStorage -> {
+ assert metaStorage != null;
+
+ metaStorage.iterate(
+ TASK_PREFIX,
+ (k, v) -> {
+ DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+ tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+ },
+ true
+ );
+ });
+ }
+ finally {
+ stopLock.leaveBusy();
+ }
}
/** {@inheritDoc} */
@@ -144,15 +158,25 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
/** {@inheritDoc} */
@Override public void afterCheckpointEnd(Context ctx) {
- for (Iterator<DurableBackgroundTask> it = toRmv.values().iterator(); it.hasNext(); ) {
- DurableBackgroundTask t = it.next();
+ if (!stopLock.enterBusy())
+ return;
- metaStorageOperation(metaStorage -> {
- if (metaStorage != null && toRmv.containsKey(t.name()))
- metaStorage.remove(metaStorageKey(t));
- });
+ try {
+ for (Iterator<DurableBackgroundTask> it = toRmv.values().iterator(); it.hasNext(); ) {
+ DurableBackgroundTask t = it.next();
+
+ metaStorageOperation(metaStorage -> {
+ if (metaStorage != null) {
+ if (!tasks.containsKey(t.name()))
+ metaStorage.remove(metaStorageKey(t));
- it.remove();
+ it.remove();
+ }
+ });
+ }
+ }
+ finally {
+ stopLock.leaveBusy();
}
}
@@ -200,27 +224,34 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
* @return Futures that will complete when the task is completed.
*/
public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask task, boolean save) {
- DurableBackgroundTaskState taskState = tasks.compute(task.name(), (taskName, prev) -> {
- if (prev != null && prev.state() != COMPLETED)
- throw new IllegalArgumentException("Task is already present and has not been completed: " + taskName);
-
- if (save)
- toRmv.remove(taskName);
+ if (!stopLock.enterBusy())
+ throw new IgniteException("Node is stopping.");
- return new DurableBackgroundTaskState(task, new GridFutureAdapter<>(), save);
- });
+ try {
+ DurableBackgroundTaskState taskState = tasks.compute(task.name(), (taskName, prev) -> {
+ if (prev != null && prev.state() != COMPLETED) {
+ throw new IllegalArgumentException("Task is already present and has not been completed: " +
+ taskName);
+ }
- if (save) {
- metaStorageOperation(metaStorage -> {
- if (metaStorage != null)
- metaStorage.write(metaStorageKey(task), task);
+ return new DurableBackgroundTaskState(task, new GridFutureAdapter<>(), save);
});
- }
- if (!prohibitionExecTasks)
- executeAsync0(task);
+ if (save) {
+ metaStorageOperation(metaStorage -> {
+ if (metaStorage != null)
+ metaStorage.write(metaStorageKey(task), task);
+ });
+ }
+
+ if (!prohibitionExecTasks)
+ executeAsync0(task);
- return taskState.outFuture();
+ return taskState.outFuture();
+ }
+ finally {
+ stopLock.leaveBusy();
+ }
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
index ba0da72..8b389b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
@@ -17,8 +17,11 @@
package org.apache.ignite.internal.processors.localtask;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -26,8 +29,11 @@ import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointWorkflow;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
import org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
@@ -45,6 +51,7 @@ import static org.apache.ignite.internal.processors.localtask.DurableBackgroundT
import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessor.metaStorageKey;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/**
* Class for testing the {@link DurableBackgroundTasksProcessor}.
@@ -171,6 +178,111 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
}
/**
+ * Checks that stopping the node does not fail the node when deleting tasks.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNotFailNodeWhenNodeStoppindAndDeleteTasks() throws Exception {
+ IgniteEx n = startGrid(0);
+
+ ObservingCheckpointListener observingCpLsnr = new ObservingCheckpointListener();
+ dbMgr(n).addCheckpointListener(observingCpLsnr);
+
+ n.cluster().state(ACTIVE);
+
+ CheckpointWorkflow cpWorkflow = checkpointWorkflow(n);
+ List<CheckpointListener> cpLs = cpWorkflow.getRelevantCheckpointListeners(dbMgr(n).checkpointedDataRegions());
+
+ assertTrue(cpLs.contains(observingCpLsnr));
+ assertTrue(cpLs.contains(durableBackgroundTask(n)));
+ assertTrue(cpLs.indexOf(observingCpLsnr) < cpLs.indexOf(durableBackgroundTask(n)));
+
+ SimpleTask simpleTask = new SimpleTask("t");
+ IgniteInternalFuture<Void> taskFut = durableBackgroundTask(n).executeAsync(simpleTask, true);
+
+ simpleTask.onExecFut.get(getTestTimeout());
+
+ GridFutureAdapter<Void> startStopFut = new GridFutureAdapter<>();
+ GridFutureAdapter<Void> finishStopFut = new GridFutureAdapter<>();
+
+ observingCpLsnr.repeatOnMarkCheckpointBeginConsumer = true;
+ observingCpLsnr.onMarkCheckpointBeginConsumer = ctx -> {
+ if (n.context().isStopping()) {
+ startStopFut.onDone();
+
+ finishStopFut.get(getTestTimeout());
+
+ observingCpLsnr.repeatOnMarkCheckpointBeginConsumer = false;
+ }
+ };
+
+ IgniteInternalFuture<Void> stopFut = runAsync(() -> stopAllGrids(false));
+
+ startStopFut.get(getTestTimeout());
+
+ simpleTask.taskFut.onDone(DurableBackgroundTaskResult.complete(null));
+ taskFut.get(getTestTimeout());
+
+ finishStopFut.onDone();
+
+ stopFut.get(getTestTimeout());
+ assertNull(n.context().failure().failureContext());
+
+ assertThrows(log, () -> durableBackgroundTask(n).executeAsync(simpleTask, true), IgniteException.class, null);
+ }
+
+ /**
+ * Check that the task will not be deleted from the MetaStorage if it was restarted.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDontDeleteTaskIfItsRestart() throws Exception {
+ IgniteEx n = startGrid(0);
+
+ ObservingCheckpointListener observingCpLsnr = new ObservingCheckpointListener();
+ dbMgr(n).addCheckpointListener(observingCpLsnr);
+
+ n.cluster().state(ACTIVE);
+
+ CheckpointWorkflow cpWorkflow = checkpointWorkflow(n);
+ List<CheckpointListener> cpLs = cpWorkflow.getRelevantCheckpointListeners(dbMgr(n).checkpointedDataRegions());
+
+ assertTrue(cpLs.contains(observingCpLsnr));
+ assertTrue(cpLs.contains(durableBackgroundTask(n)));
+ assertTrue(cpLs.indexOf(observingCpLsnr) < cpLs.indexOf(durableBackgroundTask(n)));
+
+ SimpleTask simpleTask0 = new SimpleTask("t");
+ IgniteInternalFuture<Void> taskFut = durableBackgroundTask(n).executeAsync(simpleTask0, true);
+
+ simpleTask0.onExecFut.get(getTestTimeout());
+
+ dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+ simpleTask0.taskFut.onDone(DurableBackgroundTaskResult.complete(null));
+ taskFut.get(getTestTimeout());
+
+ SimpleTask simpleTask1 = new SimpleTask("t");
+ AtomicReference<IgniteInternalFuture<Void>> taskFutRef = new AtomicReference<>();
+
+ observingCpLsnr.afterCheckpointEndConsumer =
+ ctx -> taskFutRef.set(durableBackgroundTask(n).executeAsync(simpleTask1, true));
+
+ dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
+ forceCheckpoint();
+
+ assertNotNull(metaStorageOperation(n, ms -> ms.read(metaStorageKey(simpleTask0))));
+
+ simpleTask1.onExecFut.get(getTestTimeout());
+ simpleTask1.taskFut.onDone(DurableBackgroundTaskResult.complete(null));
+ taskFutRef.get().get(getTestTimeout());
+
+ forceCheckpoint();
+ assertNull(metaStorageOperation(n, ms -> ms.read(metaStorageKey(simpleTask1))));
+ }
+
+ /**
* Check that the task will be restarted correctly.
*
* @param save Save to MetaStorage.
@@ -342,7 +454,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
* @return Task future.
*/
private IgniteInternalFuture<Void> execAsync(IgniteEx n, DurableBackgroundTask t, boolean save) {
- return durableBackgroundTasksProcessor(n).executeAsync(t, save);
+ return durableBackgroundTask(n).executeAsync(t, save);
}
/**
@@ -352,7 +464,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
* @return Map of tasks that will be removed from the MetaStorage.
*/
private Map<String, DurableBackgroundTask> toRmv(IgniteEx n) {
- return getFieldValue(durableBackgroundTasksProcessor(n), "toRmv");
+ return getFieldValue(durableBackgroundTask(n), "toRmv");
}
/**
@@ -362,7 +474,7 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
* @return Task states map.
*/
private Map<String, DurableBackgroundTaskState> tasks(IgniteEx n) {
- return getFieldValue(durableBackgroundTasksProcessor(n), "tasks");
+ return getFieldValue(durableBackgroundTask(n), "tasks");
}
/**
@@ -371,8 +483,8 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
* @param n Node.
* @return Durable background task processor.
*/
- private DurableBackgroundTasksProcessor durableBackgroundTasksProcessor(IgniteEx n) {
- return n.context().durableBackgroundTasksProcessor();
+ private DurableBackgroundTasksProcessor durableBackgroundTask(IgniteEx n) {
+ return n.context().durableBackgroundTask();
}
/**
@@ -398,4 +510,14 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
dbMgr.checkpointReadUnlock();
}
}
+
+ /**
+ * Getting {@code CheckpointManager#checkpointWorkflow}.
+ *
+ * @param n Node.
+ * @return Checkpoint workflow.
+ */
+ private CheckpointWorkflow checkpointWorkflow(IgniteEx n) {
+ return getFieldValue(dbMgr(n), "checkpointManager", "checkpointWorkflow");
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java
index b86ed09..b7b1bbd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java
@@ -30,17 +30,24 @@ public class ObservingCheckpointListener implements CheckpointListener {
/** On mark checkpoint begin consumer. */
volatile IgniteThrowableConsumer<Context> onMarkCheckpointBeginConsumer;
+ /** Repeat {@link #onMarkCheckpointBeginConsumer}. */
+ volatile boolean repeatOnMarkCheckpointBeginConsumer;
+
/** After checkpoint end consumer. */
volatile IgniteThrowableConsumer<Context> afterCheckpointEndConsumer;
+ /** Repeat {@link #afterCheckpointEndConsumer}. */
+ volatile boolean repeatAfterCheckpointEndConsumer;
+
/** {@inheritDoc} */
@Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
IgniteThrowableConsumer<Context> consumer = onMarkCheckpointBeginConsumer;
if (consumer != null) {
- onMarkCheckpointBeginConsumer = null;
-
consumer.accept(ctx);
+
+ if (!repeatOnMarkCheckpointBeginConsumer)
+ onMarkCheckpointBeginConsumer = null;
}
}
@@ -59,9 +66,10 @@ public class ObservingCheckpointListener implements CheckpointListener {
IgniteThrowableConsumer<Context> consumer = afterCheckpointEndConsumer;
if (consumer != null) {
- afterCheckpointEndConsumer = null;
-
consumer.accept(ctx);
+
+ if (!repeatAfterCheckpointEndConsumer)
+ afterCheckpointEndConsumer = null;
}
}