You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/01/20 10:33:43 UTC
[ignite] branch master updated: IGNITE-12463 Align checkpoint
progress future state with the checkpoint state - Fixes #7153.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f9a15d9 IGNITE-12463 Align checkpoint progress future state with the checkpoint state - Fixes #7153.
f9a15d9 is described below
commit f9a15d987e584076dae7dacba25c5f4102c97e37
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Jan 20 13:27:55 2020 +0300
IGNITE-12463 Align checkpoint progress future state with the checkpoint state - Fixes #7153.
---
.../DiskPageCompressionIntegrationTest.java | 5 +-
.../internal/processors/cache/WalStateManager.java | 24 +-
.../dht/topology/GridDhtLocalPartition.java | 3 +-
...eckpointFuture.java => CheckpointProgress.java} | 17 +-
...{CheckpointFuture.java => CheckpointState.java} | 25 +--
.../GridCacheDatabaseSharedManager.java | 249 ++++++++-------------
.../IgniteCacheDatabaseSharedManager.java | 2 +-
.../processors/cache/verify/IdleVerifyUtility.java | 8 +-
.../persistence/db/wal/IgniteWalRecoveryTest.java | 3 +-
9 files changed, 138 insertions(+), 198 deletions(-)
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
index fb80054..fc1b566 100644
--- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
@@ -49,6 +49,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
import static org.apache.ignite.configuration.DiskPageCompression.ZSTD;
import static org.apache.ignite.internal.processors.cache.CacheGroupMetricsImpl.CACHE_GROUP_METRICS_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
/**
@@ -113,7 +114,7 @@ public class DiskPageCompressionIntegrationTest extends AbstractPageCompressionI
GridCacheDatabaseSharedManager dbMgr = ((GridCacheDatabaseSharedManager)ignite.context()
.cache().context().database());
- dbMgr.forceCheckpoint("test compression").finishFuture().get();
+ dbMgr.forceCheckpoint("test compression").futureFor(FINISHED).get();
FilePageStoreManager storeMgr = dbMgr.getFileStoreManager();
@@ -212,7 +213,7 @@ public class DiskPageCompressionIntegrationTest extends AbstractPageCompressionI
assertTrue(cache.putIfAbsent(i, new TestVal(i)));
if (i % 50_000 == 0) {
- dbMgr.forceCheckpoint("test").finishFuture().get();
+ dbMgr.forceCheckpoint("test").futureFor(FINISHED).get();
long sparse = mreg.<LongMetric>findMetric("SparseStorageSize").value();
long size = mreg.<LongMetric>findMetric("StorageSize").value();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 4f7ed20..044b702 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -42,7 +42,7 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.persistence.CheckpointFuture;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointProgress;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
@@ -69,6 +69,8 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_WAL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_RELEASED;
/**
* Write-ahead log state manager. Manages WAL enable and disable.
@@ -455,7 +457,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
try {
if (hasNonEmptyOwning && !grpsToEnableWal.isEmpty())
- triggerCheckpoint("wal-local-state-change-" + topVer).finishFuture().get();
+ triggerCheckpoint("wal-local-state-change-" + topVer).futureFor(FINISHED).get();
}
catch (IgniteCheckedException ex) {
throw new IgniteException(ex);
@@ -501,12 +503,12 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
// Pending updates in groups with disabled WAL are not protected from crash.
// Need to trigger checkpoint for attempt to persist them.
- CheckpointFuture cpFut = triggerCheckpoint("wal-local-state-changed-rebalance-finished-" + topVer);
+ CheckpointProgress cpFut = triggerCheckpoint("wal-local-state-changed-rebalance-finished-" + topVer);
assert cpFut != null;
// It's safe to switch partitions to owning state only if checkpoint was successfully finished.
- cpFut.finishFuture().listen(new IgniteInClosureX<IgniteInternalFuture>() {
+ cpFut.futureFor(FINISHED).listen(new IgniteInClosureX<IgniteInternalFuture>() {
@Override public void applyx(IgniteInternalFuture future) {
if (X.hasCause(future.error(), NodeStoppingException.class))
return;
@@ -671,12 +673,12 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
res = new WalStateResult(msg, false);
else {
// Initiate a checkpoint.
- CheckpointFuture cpFut = triggerCheckpoint("wal-state-change-grp-" + msg.groupId());
+ CheckpointProgress cpFut = triggerCheckpoint("wal-state-change-grp-" + msg.groupId());
if (cpFut != null) {
try {
// Wait for checkpoint mark synchronously before releasing the control.
- cpFut.beginFuture().get();
+ cpFut.futureFor(LOCK_RELEASED).get();
if (msg.enable()) {
grpCtx.globalWalEnabled(true);
@@ -1065,7 +1067,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
* @param msg Message.
* @return Checkpoint future or {@code null} if failed to get checkpointer.
*/
- @Nullable private CheckpointFuture triggerCheckpoint(String msg) {
+ @Nullable private CheckpointProgress triggerCheckpoint(String msg) {
return cctx.database().forceCheckpoint(msg);
}
@@ -1076,14 +1078,14 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
* @param msg Orignial message which triggered the process.
* @return Result.
*/
- private WalStateResult awaitCheckpoint(CheckpointFuture cpFut, WalStateProposeMessage msg) {
+ private WalStateResult awaitCheckpoint(CheckpointProgress cpFut, WalStateProposeMessage msg) {
WalStateResult res;
try {
assert msg.affinityNode();
if (cpFut != null)
- cpFut.finishFuture().get();
+ cpFut.futureFor(FINISHED).get();
res = new WalStateResult(msg, true);
}
@@ -1139,14 +1141,14 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
private final WalStateProposeMessage msg;
/** Checkpoint future. */
- private final CheckpointFuture cpFut;
+ private final CheckpointProgress cpFut;
/**
* Constructor.
*
* @param msg Propose message.
*/
- private WalStateChangeWorker(WalStateProposeMessage msg, CheckpointFuture cpFut) {
+ private WalStateChangeWorker(WalStateProposeMessage msg, CheckpointProgress cpFut) {
super(cctx.igniteInstanceName(), "wal-state-change-worker-" + msg.groupId(), WalStateManager.this.log);
this.msg = msg;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index a45aa92..f6eb74a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -80,6 +80,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
/**
* Key partition.
@@ -1181,7 +1182,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
if (forceTestCheckpointOnEviction) {
if (partWhereTestCheckpointEnforced == null && cleared >= fullSize()) {
- ctx.database().forceCheckpoint("test").finishFuture().get();
+ ctx.database().forceCheckpoint("test").futureFor(FINISHED).get();
log.warning("Forced checkpoint by test reasons for partition: " + this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
similarity index 76%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
index 381bb55..52db794b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
@@ -20,16 +20,13 @@ package org.apache.ignite.internal.processors.cache.persistence;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
/**
- * Checkpoint futures.
+ * Represents information of a progress of a given checkpoint and
+ * allows to obtain future to wait for a particular checkpoint state.
*/
-public interface CheckpointFuture {
- /**
- * @return Begin future.
- */
- public GridFutureAdapter beginFuture();
+public interface CheckpointProgress {
+ /** */
+ public boolean inProgress();
- /**
- * @return Finish future.
- */
- public GridFutureAdapter<Object> finishFuture();
+ /** */
+ public GridFutureAdapter futureFor(CheckpointState state);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointState.java
similarity index 64%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointState.java
index 381bb55..dbecf43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointState.java
@@ -17,19 +17,18 @@
package org.apache.ignite.internal.processors.cache.persistence;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-
/**
- * Checkpoint futures.
+ * Possible checkpoint states. Ordinal is important. Every next state follows the previous one.
*/
-public interface CheckpointFuture {
- /**
- * @return Begin future.
- */
- public GridFutureAdapter beginFuture();
-
- /**
- * @return Finish future.
- */
- public GridFutureAdapter<Object> finishFuture();
+public enum CheckpointState {
+ /** Checkpoint is waiting to execution. **/
+ SCHEDULED,
+ /** Checkpoint was awakened and it is preparing to start. **/
+ LOCK_TAKEN,
+ /** Checkpoint counted the pages and write lock was released. **/
+ LOCK_RELEASED,
+ /** Checkpoint marker was stored to disk. **/
+ MARKER_STORED_TO_DISK,
+ /** Checkpoint was finished. **/
+ FINISHED
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index fe11489..8602fe1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -207,10 +207,10 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
-import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.FINISHED;
-import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.LOCK_RELEASED;
-import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.LOCK_TAKEN;
-import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.MARKER_STORED_TO_DISK;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.MARKER_STORED_TO_DISK;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_FILE_MATCHER;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
@@ -1378,8 +1378,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointer = null;
- cp.scheduledCp.cpFinishFut.onDone(
- new NodeStoppingException("Checkpointer is stopped during node stop."));
+ cp.scheduledCp.fail(new NodeStoppingException("Checkpointer is stopped during node stop."));
break;
}
@@ -1646,7 +1645,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
failCheckpointReadLock();
try {
- checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut
+ checkpointer.wakeupForCheckpoint(0, "too many dirty pages")
+ .futureFor(LOCK_RELEASED)
.getUninterruptibly();
}
catch (IgniteFutureTimeoutCheckedException e) {
@@ -1873,7 +1873,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
Checkpointer cp = checkpointer;
if (cp != null)
- return cp.wakeupForCheckpoint(0, reason).cpBeginFut;
+ return cp.wakeupForCheckpoint(0, reason).futureFor(LOCK_RELEASED);
return null;
}
@@ -1886,24 +1886,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (cp == null)
return;
- CheckpointProgressSnapshot progSnapshot = cp.wakeupForCheckpoint(0, reason, lsnr);
-
- IgniteInternalFuture fut1 = progSnapshot.cpFinishFut;
-
- fut1.get();
-
- if (!progSnapshot.started)
- return;
-
- IgniteInternalFuture fut2 = cp.wakeupForCheckpoint(0, reason).cpFinishFut;
-
- assert fut1 != fut2;
-
- fut2.get();
+ cp.wakeupForCheckpoint(0, reason, lsnr).futureFor(FINISHED).get();
}
/** {@inheritDoc} */
- @Override public CheckpointFuture forceCheckpoint(String reason) {
+ @Override public CheckpointProgress forceCheckpoint(String reason) {
Checkpointer cp = checkpointer;
if (cp == null)
@@ -2179,10 +2166,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointerThread = cpThread;
- CheckpointProgressSnapshot chp = checkpointer.wakeupForCheckpoint(0, "node started");
+ CheckpointProgress chp = checkpointer.wakeupForCheckpoint(0, "node started");
if (chp != null)
- chp.cpBeginFut.get();
+ chp.futureFor(LOCK_RELEASED).get();
}
/**
@@ -3522,10 +3509,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private final ByteBuffer tmpWriteBuf;
/** Next scheduled checkpoint progress. */
- private volatile CheckpointProgress scheduledCp;
+ private volatile CheckpointProgressImpl scheduledCp;
/** Current checkpoint. This field is updated only by checkpoint thread. */
- @Nullable private volatile CheckpointProgress curCpProgress;
+ @Nullable private volatile CheckpointProgressImpl curCpProgress;
/** Shutdown now. */
private volatile boolean shutdownNow;
@@ -3548,7 +3535,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
protected Checkpointer(@Nullable String gridName, String name, IgniteLogger log) {
super(gridName, name, log, cctx.kernalContext().workersRegistry());
- scheduledCp = new CheckpointProgress(checkpointFreq);
+ scheduledCp = new CheckpointProgressImpl(checkpointFreq);
tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
@@ -3603,7 +3590,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
catch (Throwable t) {
err = t;
- scheduledCp.cpFinishFut.onDone(t);
+ scheduledCp.fail(t);
throw t;
}
@@ -3616,21 +3603,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
else if (err != null)
cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
- scheduledCp.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
+ scheduledCp.fail(new NodeStoppingException("Node is stopping."));
}
}
/**
*
*/
- private CheckpointProgressSnapshot wakeupForCheckpoint(long delayFromNow, String reason) {
+ private CheckpointProgress wakeupForCheckpoint(long delayFromNow, String reason) {
return wakeupForCheckpoint(delayFromNow, reason, null);
}
/**
*
*/
- private <R> CheckpointProgressSnapshot wakeupForCheckpoint(
+ private <R> CheckpointProgress wakeupForCheckpoint(
long delayFromNow,
String reason,
IgniteInClosure<? super IgniteInternalFuture<R>> lsnr
@@ -3638,20 +3625,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (lsnr != null) {
//To be sure lsnr always will be executed in checkpoint thread.
synchronized (this) {
- CheckpointProgress sched = scheduledCp;
+ CheckpointProgressImpl sched = scheduledCp;
- sched.cpFinishFut.listen(lsnr);
+ sched.futureFor(FINISHED).listen(lsnr);
}
}
- CheckpointProgress sched = scheduledCp;
+ CheckpointProgressImpl sched = scheduledCp;
long nextNanos = System.nanoTime() + U.millisToNanos(delayFromNow);
if (sched.nextCpNanos - nextNanos <= 0)
- return new CheckpointProgressSnapshot(sched);
-
- CheckpointProgressSnapshot ret;
+ return sched;
synchronized (this) {
sched = scheduledCp;
@@ -3662,12 +3647,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
sched.nextCpNanos = nextNanos;
}
- ret = new CheckpointProgressSnapshot(sched);
-
notifyAll();
}
- return ret;
+ return sched;
}
/**
@@ -3685,7 +3668,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
scheduledCp.snapshotOperation = snapshotOperation;
- ret = scheduledCp.cpBeginFut;
+ ret = scheduledCp.futureFor(LOCK_RELEASED);
notifyAll();
}
@@ -3707,7 +3690,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
catch (Exception e) {
if (curCpProgress != null)
- curCpProgress.cpFinishFut.onDone(e);
+ curCpProgress.fail(e);
// In case of checkpoint initialization error node should be invalidated and stopped.
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
@@ -3794,7 +3777,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
// Must re-check shutdown flag here because threads may have skipped some pages.
// If so, we should not put finish checkpoint mark.
if (shutdownNow) {
- chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
+ chp.progress.fail(new NodeStoppingException("Node is stopping."));
return;
}
@@ -3804,7 +3787,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (!skipSync) {
for (Map.Entry<PageStore, LongAdder> updStoreEntry : updStores.entrySet()) {
if (shutdownNow) {
- chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
+ chp.progress.fail(new NodeStoppingException("Node is stopping."));
return;
}
@@ -3866,7 +3849,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
catch (IgniteCheckedException e) {
if (chp != null)
- chp.progress.cpFinishFut.onDone(e);
+ chp.progress.fail(e);
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
@@ -4013,7 +3996,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (req != null)
req.waitCompleted();
- CheckpointProgress cur;
+ CheckpointProgressImpl cur;
synchronized (this) {
cur = curCpProgress;
@@ -4070,7 +4053,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws IgniteCheckedException {
long cpTs = updateLastCheckpointTime();
- CheckpointProgress curr = scheduledCp;
+ CheckpointProgressImpl curr = scheduledCp;
CheckpointRecord cpRec = new CheckpointRecord(memoryRecoveryRecordPtr);
@@ -4123,7 +4106,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
fillCacheGroupState(cpRec);
//There are allowable to replace pages only after checkpoint entry was stored to disk.
- cpPagesTuple = beginAllCheckpoints(curr.cpMarkerStored);
+ cpPagesTuple = beginAllCheckpoints(curr.futureFor(MARKER_STORED_TO_DISK));
hasPages = hasPageForWrite(cpPagesTuple.get1());
@@ -4159,7 +4142,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
DbCheckpointListener.Context ctx = createOnCheckpointBeginContext(ctx0, hasPages);
- curr.cpBeginFut.onDone();
+ curr.transitTo(LOCK_RELEASED);
for (DbCheckpointListener lsnr : lsnrs)
lsnr.onCheckpointBegin(ctx);
@@ -4187,7 +4170,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
writeCheckpointEntry(tmpWriteBuf, cp, CheckpointEntryType.START);
- curr.cpMarkerStored.onDone();
+ curr.transitTo(MARKER_STORED_TO_DISK);
tracker.onSplitAndSortCpPagesStart();
@@ -4365,19 +4348,19 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*
* @return Current checkpoint progress.
*/
- @NotNull private GridCacheDatabaseSharedManager.CheckpointProgress updateCurrentCheckpointProgress() {
- final CheckpointProgress curr;
+ @NotNull private CheckpointProgress updateCurrentCheckpointProgress() {
+ final CheckpointProgressImpl curr;
synchronized (this) {
curr = scheduledCp;
- curr.state(LOCK_TAKEN);
+ curr.transitTo(LOCK_TAKEN);
if (curr.reason == null)
curr.reason = "timeout";
// It is important that we assign a new progress object before checkpoint mark in page memory.
- scheduledCp = new CheckpointProgress(checkpointFreq);
+ scheduledCp = new CheckpointProgressImpl(checkpointFreq);
curCpProgress = curr;
}
@@ -4503,7 +4486,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
removeCheckpointFiles(cp);
if (chp.progress != null)
- chp.progress.cpFinishFut.onDone();
+ chp.progress.transitTo(FINISHED);
}
/** {@inheritDoc} */
@@ -4534,7 +4517,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*/
private class DbCheckpointContextImpl implements DbCheckpointListener.Context {
/** Current checkpoint progress. */
- private final CheckpointProgress curr;
+ private final CheckpointProgressImpl curr;
/** Partition map. */
private final PartitionAllocationMap map;
@@ -4546,7 +4529,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param curr Current checkpoint progress.
* @param map Partition map.
*/
- private DbCheckpointContextImpl(CheckpointProgress curr, PartitionAllocationMap map) {
+ private DbCheckpointContextImpl(CheckpointProgressImpl curr, PartitionAllocationMap map) {
this.curr = curr;
this.map = map;
this.pendingTaskFuture = asyncRunner == null ? null : new GridCompoundFuture();
@@ -4907,7 +4890,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private final GridMultiCollectionWrapper<FullPageId> cpPages;
/** */
- private final CheckpointProgress progress;
+ private final CheckpointProgressImpl progress;
/** Number of deleted WAL files. */
private int walFilesDeleted;
@@ -4926,7 +4909,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private Checkpoint(
@Nullable CheckpointEntry cpEntry,
@NotNull GridMultiCollectionWrapper<FullPageId> cpPages,
- CheckpointProgress progress
+ CheckpointProgressImpl progress
) {
this.cpEntry = cpEntry;
this.cpPages = cpPages;
@@ -5015,49 +4998,22 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* Data class representing the state of running/scheduled checkpoint.
*/
- public static class CheckpointProgress {
+ public static class CheckpointProgressImpl implements CheckpointProgress {
/** Scheduled time of checkpoint. */
private volatile long nextCpNanos;
- /** Checkpoint begin phase future. TODO it should be encapsulated. */
- private GridFutureAdapter cpBeginFut = new GridFutureAdapter<Void>() {
- @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
- CheckpointProgress.this.state(LOCK_RELEASED);
-
- return super.onDone(res, err, cancel);
- }
- };
-
- /** Checkpoint marker stored to disk phase future. TODO it should be encapsulated. */
- private GridFutureAdapter cpMarkerStored = new GridFutureAdapter<Void>() {
- @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
- CheckpointProgress.this.state(MARKER_STORED_TO_DISK);
-
- return super.onDone(res, err, cancel);
- }
- };
-
- /** Checkpoint finish phase future. TODO it should be encapsulated. */
- private GridFutureAdapter cpFinishFut = new GridFutureAdapter<Void>() {
- @Override protected boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
- if (err != null && !cpBeginFut.isDone())
- cpBeginFut.onDone(err);
-
- if (err != null && !cpMarkerStored.isDone())
- cpMarkerStored.onDone(err);
+ /** Current checkpoint state. */
+ private volatile AtomicReference<CheckpointState> state = new AtomicReference(CheckpointState.SCHEDULED);
- CheckpointProgress.this.state(FINISHED);
+ /** Future which would be finished when corresponds state is set. */
+ private final Map<CheckpointState, GridFutureAdapter> stateFutures = new ConcurrentHashMap<>();
- return super.onDone(res, err, cancel);
- }
- };
+ /** Cause of fail, which has happened during the checkpoint or null if checkpoint was successful. */
+ private volatile Throwable failCause;
/** Flag indicates that snapshot operation will be performed after checkpoint. */
private volatile boolean nextSnapshot;
- /** Current checkpoint state. */
- private volatile AtomicReference<State> state = new AtomicReference(State.SCHEDULED);
-
/** Snapshot operation that should be performed if {@link #nextSnapshot} set to true. */
private volatile SnapshotOperation snapshotOperation;
@@ -5070,94 +5026,79 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* @param cpFreq Timeout until next checkpoint.
*/
- private CheckpointProgress(long cpFreq) {
+ private CheckpointProgressImpl(long cpFreq) {
this.nextCpNanos = System.nanoTime() + U.millisToNanos(cpFreq);
}
/**
- * @return {@code true} if checkpoint is stated.
- * @deprecated For legacy reason.
+ * @return {@code true} If checkpoint already started but have not finished yet.
*/
- @Deprecated
- public boolean inProgress() {
- return state.get().ordinal() >= State.LOCK_TAKEN.ordinal();
- }
-
- /** */
- public boolean started() {
- return cpBeginFut.isDone();
- }
-
- /** */
- public boolean finished() {
- return cpFinishFut.isDone();
+ @Override public boolean inProgress() {
+ return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
}
/**
* @param expectedState Expected state.
* @return {@code true} if current state equal to given state.
*/
- public boolean atLeastState(State expectedState) {
+ public boolean greaterOrEqualTo(CheckpointState expectedState) {
return state.get().ordinal() >= expectedState.ordinal();
}
/**
- * Changing checkpoint state if order of state is correct.
- *
- * @param newState New checkpoint state.
+ * @param state State for which future should be returned.
+ * @return Existed or new future which corresponds to the given state.
*/
- public void state(@NotNull State newState) {
- State state = this.state.get();
+ @Override public GridFutureAdapter futureFor(CheckpointState state) {
+ GridFutureAdapter stateFut = stateFutures.computeIfAbsent(state, (k) -> new GridFutureAdapter());
- if (state.ordinal() < newState.ordinal())
- this.state.compareAndSet(state, newState);
+ if (greaterOrEqualTo(state) && !stateFut.isDone())
+ stateFut.onDone(failCause);
+
+ return stateFut;
}
/**
- * Possible checkpoint states. Ordinal is important. Every next state follows the previous one.
+ * Mark this checkpoint execution as failed.
+ *
+ * @param error Causal error of fail.
*/
- enum State {
- /** Checkpoint is waiting to execution. **/
- SCHEDULED,
- /** Checkpoint was awakened and it is preparing to start. **/
- LOCK_TAKEN,
- /** Checkpoint counted the pages and write lock was released. **/
- LOCK_RELEASED,
- /** Checkpoint marker was stored to disk. **/
- MARKER_STORED_TO_DISK,
- /** Checkpoint was finished. **/
- FINISHED
- }
- }
+ public void fail(Throwable error) {
+ failCause = error;
- /**
- *
- */
- private static class CheckpointProgressSnapshot implements CheckpointFuture {
- /** */
- private final boolean started;
+ transitTo(FINISHED);
+ }
- /** */
- private final GridFutureAdapter<Object> cpBeginFut;
+ /**
+ * Changing checkpoint state if order of state is correct.
+ *
+ * @param newState New checkpoint state.
+ */
+ public void transitTo(@NotNull CheckpointState newState) {
+ CheckpointState state = this.state.get();
- /** */
- private final GridFutureAdapter<Object> cpFinishFut;
+ if (state.ordinal() < newState.ordinal()) {
+ this.state.compareAndSet(state, newState);
- /** */
- CheckpointProgressSnapshot(CheckpointProgress cpProgress) {
- started = cpProgress.inProgress();
- cpBeginFut = cpProgress.cpBeginFut;
- cpFinishFut = cpProgress.cpFinishFut;
+ doFinishFuturesWhichLessOrEqualTo(newState);
+ }
}
- /** {@inheritDoc} */
- @Override public GridFutureAdapter beginFuture() {
- return cpBeginFut;
- }
+ /**
+ * Finishing futures with correct result in direct state order until lastState(included).
+ *
+ * @param lastState State until which futures should be done.
+ */
+ private void doFinishFuturesWhichLessOrEqualTo(@NotNull CheckpointState lastState) {
+ for (CheckpointState old : CheckpointState.values()) {
+ GridFutureAdapter fut = stateFutures.get(old);
- /** {@inheritDoc} */
- @Override public GridFutureAdapter<Object> finishFuture() {
- return cpFinishFut;
+ if (fut != null && !fut.isDone())
+ fut.onDone(failCause);
+
+ if (old == lastState)
+ return;
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 865a09e..bfdf7f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -926,7 +926,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
*
* @param reason Reason.
*/
- @Nullable public CheckpointFuture forceCheckpoint(String reason) {
+ @Nullable public CheckpointProgress forceCheckpoint(String reason) {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
index d386ec3..0e15efc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointProgress;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
@@ -104,13 +105,10 @@ public class IdleVerifyUtility {
if (!(db instanceof GridCacheDatabaseSharedManager))
return false;
- GridCacheDatabaseSharedManager.CheckpointProgress progress =
+ CheckpointProgress progress =
((GridCacheDatabaseSharedManager)db).getCheckpointer().currentProgress();
- if (progress == null)
- return false;
-
- return progress.started() && !progress.finished();
+ return progress != null && progress.inProgress();
}
/** */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 65c1bed..40fdea7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -117,6 +117,7 @@ import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CHECKPOINT_FREQ;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
@@ -521,7 +522,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ig2
.context().cache().context().database();
- IgniteInternalFuture<?> cpFinishFut = dbMgr.forceCheckpoint("force checkpoint").finishFuture();
+ IgniteInternalFuture<?> cpFinishFut = dbMgr.forceCheckpoint("force checkpoint").futureFor(FINISHED);
// Delete checkpoint END file to emulate node stopped at the middle of checkpoint.
cpFinishFut.listen(new IgniteInClosureX<IgniteInternalFuture>() {