You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/01/20 10:33:43 UTC

[ignite] branch master updated: IGNITE-12463 Align checkpoint progress future state with the checkpoint state - Fixes #7153.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f9a15d9  IGNITE-12463 Align checkpoint progress future state with the checkpoint state - Fixes #7153.
f9a15d9 is described below

commit f9a15d987e584076dae7dacba25c5f4102c97e37
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Jan 20 13:27:55 2020 +0300

    IGNITE-12463 Align checkpoint progress future state with the checkpoint state - Fixes #7153.
---
 .../DiskPageCompressionIntegrationTest.java        |   5 +-
 .../internal/processors/cache/WalStateManager.java |  24 +-
 .../dht/topology/GridDhtLocalPartition.java        |   3 +-
 ...eckpointFuture.java => CheckpointProgress.java} |  17 +-
 ...{CheckpointFuture.java => CheckpointState.java} |  25 +--
 .../GridCacheDatabaseSharedManager.java            | 249 ++++++++-------------
 .../IgniteCacheDatabaseSharedManager.java          |   2 +-
 .../processors/cache/verify/IdleVerifyUtility.java |   8 +-
 .../persistence/db/wal/IgniteWalRecoveryTest.java  |   3 +-
 9 files changed, 138 insertions(+), 198 deletions(-)

diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
index fb80054..fc1b566 100644
--- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
@@ -49,6 +49,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
 import static org.apache.ignite.configuration.DiskPageCompression.ZSTD;
 import static org.apache.ignite.internal.processors.cache.CacheGroupMetricsImpl.CACHE_GROUP_METRICS_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
 import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 
 /**
@@ -113,7 +114,7 @@ public class DiskPageCompressionIntegrationTest extends AbstractPageCompressionI
         GridCacheDatabaseSharedManager dbMgr = ((GridCacheDatabaseSharedManager)ignite.context()
             .cache().context().database());
 
-        dbMgr.forceCheckpoint("test compression").finishFuture().get();
+        dbMgr.forceCheckpoint("test compression").futureFor(FINISHED).get();
 
         FilePageStoreManager storeMgr = dbMgr.getFileStoreManager();
 
@@ -212,7 +213,7 @@ public class DiskPageCompressionIntegrationTest extends AbstractPageCompressionI
             assertTrue(cache.putIfAbsent(i, new TestVal(i)));
 
             if (i % 50_000 == 0) {
-                dbMgr.forceCheckpoint("test").finishFuture().get();
+                dbMgr.forceCheckpoint("test").futureFor(FINISHED).get();
 
                 long sparse = mreg.<LongMetric>findMetric("SparseStorageSize").value();
                 long size = mreg.<LongMetric>findMetric("StorageSize").value();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 4f7ed20..044b702 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -42,7 +42,7 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.persistence.CheckpointFuture;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointProgress;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
@@ -69,6 +69,8 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_WAL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_RELEASED;
 
 /**
  * Write-ahead log state manager. Manages WAL enable and disable.
@@ -455,7 +457,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
 
         try {
             if (hasNonEmptyOwning && !grpsToEnableWal.isEmpty())
-                triggerCheckpoint("wal-local-state-change-" + topVer).finishFuture().get();
+                triggerCheckpoint("wal-local-state-change-" + topVer).futureFor(FINISHED).get();
         }
         catch (IgniteCheckedException ex) {
             throw new IgniteException(ex);
@@ -501,12 +503,12 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
 
             // Pending updates in groups with disabled WAL are not protected from crash.
             // Need to trigger checkpoint for attempt to persist them.
-            CheckpointFuture cpFut = triggerCheckpoint("wal-local-state-changed-rebalance-finished-" + topVer);
+            CheckpointProgress cpFut = triggerCheckpoint("wal-local-state-changed-rebalance-finished-" + topVer);
 
             assert cpFut != null;
 
             // It's safe to switch partitions to owning state only if checkpoint was successfully finished.
-            cpFut.finishFuture().listen(new IgniteInClosureX<IgniteInternalFuture>() {
+            cpFut.futureFor(FINISHED).listen(new IgniteInClosureX<IgniteInternalFuture>() {
                 @Override public void applyx(IgniteInternalFuture future) {
                     if (X.hasCause(future.error(), NodeStoppingException.class))
                         return;
@@ -671,12 +673,12 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
                         res = new WalStateResult(msg, false);
                     else {
                         // Initiate a checkpoint.
-                        CheckpointFuture cpFut = triggerCheckpoint("wal-state-change-grp-" + msg.groupId());
+                        CheckpointProgress cpFut = triggerCheckpoint("wal-state-change-grp-" + msg.groupId());
 
                         if (cpFut != null) {
                             try {
                                 // Wait for checkpoint mark synchronously before releasing the control.
-                                cpFut.beginFuture().get();
+                                cpFut.futureFor(LOCK_RELEASED).get();
 
                                 if (msg.enable()) {
                                     grpCtx.globalWalEnabled(true);
@@ -1065,7 +1067,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
      * @param msg Message.
      * @return Checkpoint future or {@code null} if failed to get checkpointer.
      */
-    @Nullable private CheckpointFuture triggerCheckpoint(String msg) {
+    @Nullable private CheckpointProgress triggerCheckpoint(String msg) {
         return cctx.database().forceCheckpoint(msg);
     }
 
@@ -1076,14 +1078,14 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
      * @param msg Orignial message which triggered the process.
      * @return Result.
      */
-    private WalStateResult awaitCheckpoint(CheckpointFuture cpFut, WalStateProposeMessage msg) {
+    private WalStateResult awaitCheckpoint(CheckpointProgress cpFut, WalStateProposeMessage msg) {
         WalStateResult res;
 
         try {
             assert msg.affinityNode();
 
             if (cpFut != null)
-                cpFut.finishFuture().get();
+                cpFut.futureFor(FINISHED).get();
 
             res = new WalStateResult(msg, true);
         }
@@ -1139,14 +1141,14 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
         private final WalStateProposeMessage msg;
 
         /** Checkpoint future. */
-        private final CheckpointFuture cpFut;
+        private final CheckpointProgress cpFut;
 
         /**
          * Constructor.
          *
          * @param msg Propose message.
          */
-        private WalStateChangeWorker(WalStateProposeMessage msg, CheckpointFuture cpFut) {
+        private WalStateChangeWorker(WalStateProposeMessage msg, CheckpointProgress cpFut) {
             super(cctx.igniteInstanceName(), "wal-state-change-worker-" + msg.groupId(), WalStateManager.this.log);
 
             this.msg = msg;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index a45aa92..f6eb74a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -80,6 +80,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
 
 /**
  * Key partition.
@@ -1181,7 +1182,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
 
             if (forceTestCheckpointOnEviction) {
                 if (partWhereTestCheckpointEnforced == null && cleared >= fullSize()) {
-                    ctx.database().forceCheckpoint("test").finishFuture().get();
+                    ctx.database().forceCheckpoint("test").futureFor(FINISHED).get();
 
                     log.warning("Forced checkpoint by test reasons for partition: " + this);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
similarity index 76%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
index 381bb55..52db794b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
@@ -20,16 +20,13 @@ package org.apache.ignite.internal.processors.cache.persistence;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 
 /**
- * Checkpoint futures.
+ * Represents information of a progress of a given checkpoint and
+ * allows to obtain future to wait for a particular checkpoint state.
  */
-public interface CheckpointFuture {
-    /**
-     * @return Begin future.
-     */
-    public GridFutureAdapter beginFuture();
+public interface CheckpointProgress {
+    /** */
+    public boolean inProgress();
 
-    /**
-     * @return Finish future.
-     */
-    public GridFutureAdapter<Object> finishFuture();
+    /** */
+    public GridFutureAdapter futureFor(CheckpointState state);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointState.java
similarity index 64%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointState.java
index 381bb55..dbecf43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointState.java
@@ -17,19 +17,18 @@
 
 package org.apache.ignite.internal.processors.cache.persistence;
 
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-
 /**
- * Checkpoint futures.
+ * Possible checkpoint states. Ordinal is important. Every next state follows the previous one.
  */
-public interface CheckpointFuture {
-    /**
-     * @return Begin future.
-     */
-    public GridFutureAdapter beginFuture();
-
-    /**
-     * @return Finish future.
-     */
-    public GridFutureAdapter<Object> finishFuture();
+public enum CheckpointState {
+    /** Checkpoint is waiting to execution. **/
+    SCHEDULED,
+    /** Checkpoint was awakened and it is preparing to start. **/
+    LOCK_TAKEN,
+    /** Checkpoint counted the pages and write lock was released. **/
+    LOCK_RELEASED,
+    /** Checkpoint marker was stored to disk. **/
+    MARKER_STORED_TO_DISK,
+    /** Checkpoint was finished. **/
+    FINISHED
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index fe11489..8602fe1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -207,10 +207,10 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
-import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.FINISHED;
-import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.LOCK_RELEASED;
-import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.LOCK_TAKEN;
-import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.MARKER_STORED_TO_DISK;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.MARKER_STORED_TO_DISK;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_FILE_MATCHER;
 import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
 import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
@@ -1378,8 +1378,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                         checkpointer = null;
 
-                        cp.scheduledCp.cpFinishFut.onDone(
-                            new NodeStoppingException("Checkpointer is stopped during node stop."));
+                        cp.scheduledCp.fail(new NodeStoppingException("Checkpointer is stopped during node stop."));
 
                         break;
                     }
@@ -1646,7 +1645,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                             failCheckpointReadLock();
 
                         try {
-                            checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut
+                            checkpointer.wakeupForCheckpoint(0, "too many dirty pages")
+                                .futureFor(LOCK_RELEASED)
                                 .getUninterruptibly();
                         }
                         catch (IgniteFutureTimeoutCheckedException e) {
@@ -1873,7 +1873,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         Checkpointer cp = checkpointer;
 
         if (cp != null)
-            return cp.wakeupForCheckpoint(0, reason).cpBeginFut;
+            return cp.wakeupForCheckpoint(0, reason).futureFor(LOCK_RELEASED);
 
         return null;
     }
@@ -1886,24 +1886,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         if (cp == null)
             return;
 
-        CheckpointProgressSnapshot progSnapshot = cp.wakeupForCheckpoint(0, reason, lsnr);
-
-        IgniteInternalFuture fut1 = progSnapshot.cpFinishFut;
-
-        fut1.get();
-
-        if (!progSnapshot.started)
-            return;
-
-        IgniteInternalFuture fut2 = cp.wakeupForCheckpoint(0, reason).cpFinishFut;
-
-        assert fut1 != fut2;
-
-        fut2.get();
+        cp.wakeupForCheckpoint(0, reason, lsnr).futureFor(FINISHED).get();
     }
 
     /** {@inheritDoc} */
-    @Override public CheckpointFuture forceCheckpoint(String reason) {
+    @Override public CheckpointProgress forceCheckpoint(String reason) {
         Checkpointer cp = checkpointer;
 
         if (cp == null)
@@ -2179,10 +2166,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         checkpointerThread = cpThread;
 
-        CheckpointProgressSnapshot chp = checkpointer.wakeupForCheckpoint(0, "node started");
+        CheckpointProgress chp = checkpointer.wakeupForCheckpoint(0, "node started");
 
         if (chp != null)
-            chp.cpBeginFut.get();
+            chp.futureFor(LOCK_RELEASED).get();
     }
 
     /**
@@ -3522,10 +3509,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         private final ByteBuffer tmpWriteBuf;
 
         /** Next scheduled checkpoint progress. */
-        private volatile CheckpointProgress scheduledCp;
+        private volatile CheckpointProgressImpl scheduledCp;
 
         /** Current checkpoint. This field is updated only by checkpoint thread. */
-        @Nullable private volatile CheckpointProgress curCpProgress;
+        @Nullable private volatile CheckpointProgressImpl curCpProgress;
 
         /** Shutdown now. */
         private volatile boolean shutdownNow;
@@ -3548,7 +3535,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         protected Checkpointer(@Nullable String gridName, String name, IgniteLogger log) {
             super(gridName, name, log, cctx.kernalContext().workersRegistry());
 
-            scheduledCp = new CheckpointProgress(checkpointFreq);
+            scheduledCp = new CheckpointProgressImpl(checkpointFreq);
 
             tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
 
@@ -3603,7 +3590,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             catch (Throwable t) {
                 err = t;
 
-                scheduledCp.cpFinishFut.onDone(t);
+                scheduledCp.fail(t);
 
                 throw t;
             }
@@ -3616,21 +3603,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 else if (err != null)
                     cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
 
-                scheduledCp.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
+                scheduledCp.fail(new NodeStoppingException("Node is stopping."));
             }
         }
 
         /**
          *
          */
-        private CheckpointProgressSnapshot wakeupForCheckpoint(long delayFromNow, String reason) {
+        private CheckpointProgress wakeupForCheckpoint(long delayFromNow, String reason) {
             return wakeupForCheckpoint(delayFromNow, reason, null);
         }
 
         /**
          *
          */
-        private <R> CheckpointProgressSnapshot wakeupForCheckpoint(
+        private <R> CheckpointProgress wakeupForCheckpoint(
             long delayFromNow,
             String reason,
             IgniteInClosure<? super IgniteInternalFuture<R>> lsnr
@@ -3638,20 +3625,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             if (lsnr != null) {
                 //To be sure lsnr always will be executed in checkpoint thread.
                 synchronized (this) {
-                    CheckpointProgress sched = scheduledCp;
+                    CheckpointProgressImpl sched = scheduledCp;
 
-                    sched.cpFinishFut.listen(lsnr);
+                    sched.futureFor(FINISHED).listen(lsnr);
                 }
             }
 
-            CheckpointProgress sched = scheduledCp;
+            CheckpointProgressImpl sched = scheduledCp;
 
             long nextNanos = System.nanoTime() + U.millisToNanos(delayFromNow);
 
             if (sched.nextCpNanos - nextNanos <= 0)
-                return new CheckpointProgressSnapshot(sched);
-
-            CheckpointProgressSnapshot ret;
+                return sched;
 
             synchronized (this) {
                 sched = scheduledCp;
@@ -3662,12 +3647,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     sched.nextCpNanos = nextNanos;
                 }
 
-                ret = new CheckpointProgressSnapshot(sched);
-
                 notifyAll();
             }
 
-            return ret;
+            return sched;
         }
 
         /**
@@ -3685,7 +3668,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 scheduledCp.snapshotOperation = snapshotOperation;
 
-                ret = scheduledCp.cpBeginFut;
+                ret = scheduledCp.futureFor(LOCK_RELEASED);
 
                 notifyAll();
             }
@@ -3707,7 +3690,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 }
                 catch (Exception e) {
                     if (curCpProgress != null)
-                        curCpProgress.cpFinishFut.onDone(e);
+                        curCpProgress.fail(e);
 
                     // In case of checkpoint initialization error node should be invalidated and stopped.
                     cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
@@ -3794,7 +3777,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         // Must re-check shutdown flag here because threads may have skipped some pages.
                         // If so, we should not put finish checkpoint mark.
                         if (shutdownNow) {
-                            chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
+                            chp.progress.fail(new NodeStoppingException("Node is stopping."));
 
                             return;
                         }
@@ -3804,7 +3787,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         if (!skipSync) {
                             for (Map.Entry<PageStore, LongAdder> updStoreEntry : updStores.entrySet()) {
                                 if (shutdownNow) {
-                                    chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
+                                    chp.progress.fail(new NodeStoppingException("Node is stopping."));
 
                                     return;
                                 }
@@ -3866,7 +3849,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             }
             catch (IgniteCheckedException e) {
                 if (chp != null)
-                    chp.progress.cpFinishFut.onDone(e);
+                    chp.progress.fail(e);
 
                 cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
             }
@@ -4013,7 +3996,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             if (req != null)
                 req.waitCompleted();
 
-            CheckpointProgress cur;
+            CheckpointProgressImpl cur;
 
             synchronized (this) {
                 cur = curCpProgress;
@@ -4070,7 +4053,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws IgniteCheckedException {
             long cpTs = updateLastCheckpointTime();
 
-            CheckpointProgress curr = scheduledCp;
+            CheckpointProgressImpl curr = scheduledCp;
 
             CheckpointRecord cpRec = new CheckpointRecord(memoryRecoveryRecordPtr);
 
@@ -4123,7 +4106,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 fillCacheGroupState(cpRec);
 
                 //There are allowable to replace pages only after checkpoint entry was stored to disk.
-                cpPagesTuple = beginAllCheckpoints(curr.cpMarkerStored);
+                cpPagesTuple = beginAllCheckpoints(curr.futureFor(MARKER_STORED_TO_DISK));
 
                 hasPages = hasPageForWrite(cpPagesTuple.get1());
 
@@ -4159,7 +4142,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             DbCheckpointListener.Context ctx = createOnCheckpointBeginContext(ctx0, hasPages);
 
-            curr.cpBeginFut.onDone();
+            curr.transitTo(LOCK_RELEASED);
 
             for (DbCheckpointListener lsnr : lsnrs)
                 lsnr.onCheckpointBegin(ctx);
@@ -4187,7 +4170,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 writeCheckpointEntry(tmpWriteBuf, cp, CheckpointEntryType.START);
 
-                curr.cpMarkerStored.onDone();
+                curr.transitTo(MARKER_STORED_TO_DISK);
 
                 tracker.onSplitAndSortCpPagesStart();
 
@@ -4365,19 +4348,19 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          *
          * @return Current checkpoint progress.
          */
-        @NotNull private GridCacheDatabaseSharedManager.CheckpointProgress updateCurrentCheckpointProgress() {
-            final CheckpointProgress curr;
+        @NotNull private CheckpointProgress updateCurrentCheckpointProgress() {
+            final CheckpointProgressImpl curr;
 
             synchronized (this) {
                 curr = scheduledCp;
 
-                curr.state(LOCK_TAKEN);
+                curr.transitTo(LOCK_TAKEN);
 
                 if (curr.reason == null)
                     curr.reason = "timeout";
 
                 // It is important that we assign a new progress object before checkpoint mark in page memory.
-                scheduledCp = new CheckpointProgress(checkpointFreq);
+                scheduledCp = new CheckpointProgressImpl(checkpointFreq);
 
                 curCpProgress = curr;
             }
@@ -4503,7 +4486,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 removeCheckpointFiles(cp);
 
             if (chp.progress != null)
-                chp.progress.cpFinishFut.onDone();
+                chp.progress.transitTo(FINISHED);
         }
 
         /** {@inheritDoc} */
@@ -4534,7 +4517,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          */
         private class DbCheckpointContextImpl implements DbCheckpointListener.Context {
             /** Current checkpoint progress. */
-            private final CheckpointProgress curr;
+            private final CheckpointProgressImpl curr;
 
             /** Partition map. */
             private final PartitionAllocationMap map;
@@ -4546,7 +4529,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
              * @param curr Current checkpoint progress.
              * @param map Partition map.
              */
-            private DbCheckpointContextImpl(CheckpointProgress curr, PartitionAllocationMap map) {
+            private DbCheckpointContextImpl(CheckpointProgressImpl curr, PartitionAllocationMap map) {
                 this.curr = curr;
                 this.map = map;
                 this.pendingTaskFuture = asyncRunner == null ? null : new GridCompoundFuture();
@@ -4907,7 +4890,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         private final GridMultiCollectionWrapper<FullPageId> cpPages;
 
         /** */
-        private final CheckpointProgress progress;
+        private final CheckpointProgressImpl progress;
 
         /** Number of deleted WAL files. */
         private int walFilesDeleted;
@@ -4926,7 +4909,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         private Checkpoint(
             @Nullable CheckpointEntry cpEntry,
             @NotNull GridMultiCollectionWrapper<FullPageId> cpPages,
-            CheckpointProgress progress
+            CheckpointProgressImpl progress
         ) {
             this.cpEntry = cpEntry;
             this.cpPages = cpPages;
@@ -5015,49 +4998,22 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /**
      * Data class representing the state of running/scheduled checkpoint.
      */
-    public static class CheckpointProgress {
+    public static class CheckpointProgressImpl implements CheckpointProgress {
         /** Scheduled time of checkpoint. */
         private volatile long nextCpNanos;
 
-        /** Checkpoint begin phase future. TODO it should be encapsulated. */
-        private GridFutureAdapter cpBeginFut = new GridFutureAdapter<Void>() {
-            @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
-                CheckpointProgress.this.state(LOCK_RELEASED);
-
-                return super.onDone(res, err, cancel);
-            }
-        };
-
-        /** Checkpoint marker stored to disk phase future. TODO it should be encapsulated. */
-        private GridFutureAdapter cpMarkerStored = new GridFutureAdapter<Void>() {
-            @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
-                CheckpointProgress.this.state(MARKER_STORED_TO_DISK);
-
-                return super.onDone(res, err, cancel);
-            }
-        };
-
-        /** Checkpoint finish phase future. TODO it should be encapsulated. */
-        private GridFutureAdapter cpFinishFut = new GridFutureAdapter<Void>() {
-            @Override protected boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
-                if (err != null && !cpBeginFut.isDone())
-                    cpBeginFut.onDone(err);
-
-                if (err != null && !cpMarkerStored.isDone())
-                    cpMarkerStored.onDone(err);
+        /** Current checkpoint state. */
+        private volatile AtomicReference<CheckpointState> state = new AtomicReference(CheckpointState.SCHEDULED);
 
-                CheckpointProgress.this.state(FINISHED);
+        /** Future which would be finished when corresponds state is set. */
+        private final Map<CheckpointState, GridFutureAdapter> stateFutures = new ConcurrentHashMap<>();
 
-                return super.onDone(res, err, cancel);
-            }
-        };
+        /** Cause of fail, which has happened during the checkpoint or null if checkpoint was successful. */
+        private volatile Throwable failCause;
 
         /** Flag indicates that snapshot operation will be performed after checkpoint. */
         private volatile boolean nextSnapshot;
 
-        /** Current checkpoint state. */
-        private volatile AtomicReference<State> state = new AtomicReference(State.SCHEDULED);
-
         /** Snapshot operation that should be performed if {@link #nextSnapshot} set to true. */
         private volatile SnapshotOperation snapshotOperation;
 
@@ -5070,94 +5026,79 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         /**
          * @param cpFreq Timeout until next checkpoint.
          */
-        private CheckpointProgress(long cpFreq) {
+        private CheckpointProgressImpl(long cpFreq) {
             this.nextCpNanos = System.nanoTime() + U.millisToNanos(cpFreq);
         }
 
         /**
-         * @return {@code true} if checkpoint is stated.
-         * @deprecated For legacy reason.
+         * @return {@code true} If checkpoint already started but have not finished yet.
          */
-        @Deprecated
-        public boolean inProgress() {
-            return state.get().ordinal() >= State.LOCK_TAKEN.ordinal();
-        }
-
-        /** */
-        public boolean started() {
-            return cpBeginFut.isDone();
-        }
-
-        /** */
-        public boolean finished() {
-            return cpFinishFut.isDone();
+        @Override public boolean inProgress() {
+            return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
         }
 
         /**
          * @param expectedState Expected state.
          * @return {@code true} if current state equal to given state.
          */
-        public boolean atLeastState(State expectedState) {
+        public boolean greaterOrEqualTo(CheckpointState expectedState) {
             return state.get().ordinal() >= expectedState.ordinal();
         }
 
         /**
-         * Changing checkpoint state if order of state is correct.
-         *
-         * @param newState New checkpoint state.
+         * @param state State for which future should be returned.
+         * @return Existed or new future which corresponds to the given state.
          */
-        public void state(@NotNull State newState) {
-            State state = this.state.get();
+        @Override public GridFutureAdapter futureFor(CheckpointState state) {
+            GridFutureAdapter stateFut = stateFutures.computeIfAbsent(state, (k) -> new GridFutureAdapter());
 
-            if (state.ordinal() < newState.ordinal())
-                this.state.compareAndSet(state, newState);
+            if (greaterOrEqualTo(state) && !stateFut.isDone())
+                stateFut.onDone(failCause);
+
+            return stateFut;
         }
 
         /**
-         * Possible checkpoint states. Ordinal is important. Every next state follows the previous one.
+         * Mark this checkpoint execution as failed.
+         *
+         * @param error Causal error of fail.
          */
-        enum State {
-            /** Checkpoint is waiting to execution. **/
-            SCHEDULED,
-            /** Checkpoint was awakened and it is preparing to start. **/
-            LOCK_TAKEN,
-            /** Checkpoint counted the pages and write lock was released. **/
-            LOCK_RELEASED,
-            /** Checkpoint marker was stored to disk. **/
-            MARKER_STORED_TO_DISK,
-            /** Checkpoint was finished. **/
-            FINISHED
-        }
-    }
+        public void fail(Throwable error) {
+            failCause = error;
 
-    /**
-     *
-     */
-    private static class CheckpointProgressSnapshot implements CheckpointFuture {
-        /** */
-        private final boolean started;
+            transitTo(FINISHED);
+        }
 
-        /** */
-        private final GridFutureAdapter<Object> cpBeginFut;
+        /**
+         * Changing checkpoint state if order of state is correct.
+         *
+         * @param newState New checkpoint state.
+         */
+        public void transitTo(@NotNull CheckpointState newState) {
+            CheckpointState state = this.state.get();
 
-        /** */
-        private final GridFutureAdapter<Object> cpFinishFut;
+            if (state.ordinal() < newState.ordinal()) {
+                this.state.compareAndSet(state, newState);
 
-        /** */
-        CheckpointProgressSnapshot(CheckpointProgress cpProgress) {
-            started = cpProgress.inProgress();
-            cpBeginFut = cpProgress.cpBeginFut;
-            cpFinishFut = cpProgress.cpFinishFut;
+                doFinishFuturesWhichLessOrEqualTo(newState);
+            }
         }
 
-        /** {@inheritDoc} */
-        @Override public GridFutureAdapter beginFuture() {
-            return cpBeginFut;
-        }
+        /**
+         * Finishing futures with correct result in direct state order until lastState(included).
+         *
+         * @param lastState State until which futures should be done.
+         */
+        private void doFinishFuturesWhichLessOrEqualTo(@NotNull CheckpointState lastState) {
+            for (CheckpointState old : CheckpointState.values()) {
+                GridFutureAdapter fut = stateFutures.get(old);
 
-        /** {@inheritDoc} */
-        @Override public GridFutureAdapter<Object> finishFuture() {
-            return cpFinishFut;
+                if (fut != null && !fut.isDone())
+                    fut.onDone(failCause);
+
+                if (old == lastState)
+                    return;
+            }
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 865a09e..bfdf7f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -926,7 +926,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
      *
      * @param reason Reason.
      */
-    @Nullable public CheckpointFuture forceCheckpoint(String reason) {
+    @Nullable public CheckpointProgress forceCheckpoint(String reason) {
         return null;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
index d386ec3..0e15efc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointProgress;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
@@ -104,13 +105,10 @@ public class IdleVerifyUtility {
         if (!(db instanceof GridCacheDatabaseSharedManager))
             return false;
 
-        GridCacheDatabaseSharedManager.CheckpointProgress progress =
+        CheckpointProgress progress =
             ((GridCacheDatabaseSharedManager)db).getCheckpointer().currentProgress();
 
-        if (progress == null)
-            return false;
-
-        return progress.started() && !progress.finished();
+        return progress != null && progress.inProgress();
     }
 
     /** */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 65c1bed..40fdea7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -117,6 +117,7 @@ import org.junit.Test;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
 import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CHECKPOINT_FREQ;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
 
@@ -521,7 +522,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
         GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ig2
             .context().cache().context().database();
 
-        IgniteInternalFuture<?> cpFinishFut = dbMgr.forceCheckpoint("force checkpoint").finishFuture();
+        IgniteInternalFuture<?> cpFinishFut = dbMgr.forceCheckpoint("force checkpoint").futureFor(FINISHED);
 
         // Delete checkpoint END file to emulate node stopped at the middle of checkpoint.
         cpFinishFut.listen(new IgniteInClosureX<IgniteInternalFuture>() {