You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/20 13:06:07 UTC
[2/2] ignite git commit: IGNITE-9999 Added verbose logging for node
recovery - Fixes #5371
IGNITE-9999 Added verbose logging for node recovery - Fixes #5371
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b48a291e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b48a291e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b48a291e
Branch: refs/heads/master
Commit: b48a291e1a2fc531085cba3b60ff8647ccc1639e
Parents: 74f312e
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Tue Nov 20 15:45:39 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Nov 20 15:56:59 2018 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 7 +
.../processors/cache/CacheGroupContext.java | 148 -------
.../processors/cache/GridCacheProcessor.java | 44 +-
.../cache/IgniteCacheOffheapManager.java | 11 +
.../cache/IgniteCacheOffheapManagerImpl.java | 7 +
.../GridDhtPartitionsExchangeFuture.java | 13 +-
.../persistence/DatabaseLifecycleListener.java | 43 +-
.../GridCacheDatabaseSharedManager.java | 425 +++++++++----------
.../persistence/GridCacheOffheapManager.java | 140 ++++++
.../IgniteCacheDatabaseSharedManager.java | 12 +-
.../cache/persistence/wal/FileWALPointer.java | 2 +-
.../wal/serializer/RecordDataV1Serializer.java | 2 +-
.../db/IgniteLogicalRecoveryTest.java | 284 ++++++++-----
.../file/IgnitePdsDiskErrorsRecoveringTest.java | 58 ---
.../IgniteNodeStoppedDuringDisableWALTest.java | 3 +-
.../persistence/db/wal/WalCompactionTest.java | 13 +-
16 files changed, 619 insertions(+), 593 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 3f71642..ccf7ebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1040,6 +1040,13 @@ public final class IgniteSystemProperties {
public static final String IGNITE_ALLOW_START_CACHES_IN_PARALLEL = "IGNITE_ALLOW_START_CACHES_IN_PARALLEL";
/**
+ * Allows to log additional information about all restored partitions after binary and logical recovery phases.
+ *
+ * Default is {@code true}.
+ */
+ public static final String IGNITE_RECOVERY_VERBOSE_LOGGING = "IGNITE_RECOVERY_VERBOSE_LOGGING";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 95fc08f..fc4f79d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,17 +41,11 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCach
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
-import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
-import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
-import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext;
import org.apache.ignite.internal.processors.query.QueryUtils;
@@ -170,9 +163,6 @@ public class CacheGroupContext {
/** Flag indicates that cache group is under recovering and not attached to topology. */
private final AtomicBoolean recoveryMode;
- /** Flag indicates that all group partitions have restored their state from page memory / disk. */
- private volatile boolean partitionStatesRestored;
-
/**
* @param ctx Context.
* @param grpId Group ID.
@@ -791,144 +781,6 @@ public class CacheGroupContext {
}
/**
- * Pre-create partitions that resides in page memory or WAL and restores their state.
- */
- public long restorePartitionStates(Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates) throws IgniteCheckedException {
- if (isLocal() || !affinityNode() || !dataRegion().config().isPersistenceEnabled())
- return 0;
-
- if (partitionStatesRestored)
- return 0;
-
- long processed = 0;
-
- PageMemoryEx pageMem = (PageMemoryEx)dataRegion().pageMemory();
-
- for (int p = 0; p < affinity().partitions(); p++) {
- PartitionRecoverState recoverState = partitionRecoveryStates.get(new GroupPartitionId(grpId, p));
-
- if (ctx.pageStore().exists(grpId, p)) {
- ctx.pageStore().ensure(grpId, p);
-
- if (ctx.pageStore().pages(grpId, p) <= 1) {
- if (log.isDebugEnabled())
- log.debug("Skipping partition on recovery (pages less than 1) " +
- "[grp=" + cacheOrGroupName() + ", p=" + p + "]");
-
- continue;
- }
-
- if (log.isDebugEnabled())
- log.debug("Creating partition on recovery (exists in page store) " +
- "[grp=" + cacheOrGroupName() + ", p=" + p + "]");
-
- processed++;
-
- GridDhtLocalPartition part = topology().forceCreatePartition(p);
-
- offheap().onPartitionInitialCounterUpdated(p, 0);
-
- ctx.database().checkpointReadLock();
-
- try {
- long partMetaId = pageMem.partitionMetaPageId(grpId, p);
- long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
-
- try {
- long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
-
- boolean changed = false;
-
- try {
- PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
-
- if (recoverState != null) {
- io.setPartitionState(pageAddr, (byte) recoverState.stateId());
-
- changed = updateState(part, recoverState.stateId());
-
- if (recoverState.stateId() == GridDhtPartitionState.OWNING.ordinal()
- || (recoverState.stateId() == GridDhtPartitionState.MOVING.ordinal()
- && part.initialUpdateCounter() < recoverState.updateCounter())) {
- part.initialUpdateCounter(recoverState.updateCounter());
-
- changed = true;
- }
-
- if (log.isInfoEnabled())
- log.warning("Restored partition state (from WAL) " +
- "[grp=" + cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
- ", updCntr=" + part.initialUpdateCounter() + "]");
- }
- else {
- int stateId = (int) io.getPartitionState(pageAddr);
-
- changed = updateState(part, stateId);
-
- if (log.isDebugEnabled())
- log.debug("Restored partition state (from page memory) " +
- "[grp=" + cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
- ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId + "]");
- }
- }
- finally {
- pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
- }
- }
- finally {
- pageMem.releasePage(grpId, partMetaId, partMetaPage);
- }
- }
- finally {
- ctx.database().checkpointReadUnlock();
- }
- }
- else if (recoverState != null) {
- GridDhtLocalPartition part = topology().forceCreatePartition(p);
-
- offheap().onPartitionInitialCounterUpdated(p, recoverState.updateCounter());
-
- updateState(part, recoverState.stateId());
-
- processed++;
-
- if (log.isDebugEnabled())
- log.debug("Restored partition state (from WAL) " +
- "[grp=" + cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
- ", updCntr=" + part.initialUpdateCounter() + "]");
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Skipping partition on recovery (no page store OR wal state) " +
- "[grp=" + cacheOrGroupName() + ", p=" + p + "]");
- }
- }
-
- partitionStatesRestored = true;
-
- return processed;
- }
-
- /**
- * @param part Partition to restore state for.
- * @param stateId State enum ordinal.
- * @return Updated flag.
- */
- private boolean updateState(GridDhtLocalPartition part, int stateId) {
- if (stateId != -1) {
- GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId);
-
- assert state != null;
-
- part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state);
-
- return true;
- }
-
- return false;
- }
-
- /**
* @return {@code True} if current cache group is in recovery mode.
*/
public boolean isRecoveryMode() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 23c3623..ce81468 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -106,6 +106,8 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
@@ -5438,6 +5440,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
private final Map<Integer, QuerySchema> querySchemas = new ConcurrentHashMap<>();
/** {@inheritDoc} */
+ @Override public void onBaselineChange() {
+ onKernalStopCaches(true);
+
+ stopCaches(true);
+
+ sharedCtx.database().cleanupRestoredCaches();
+ }
+
+ /** {@inheritDoc} */
@Override public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
restoreCacheConfigurations();
}
@@ -5449,13 +5460,44 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void afterBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
+ @Override public void afterBinaryMemoryRestore(GridCacheDatabaseSharedManager.RestoreBinaryState binaryState) throws IgniteCheckedException {
for (DynamicCacheDescriptor cacheDescriptor : persistentCaches()) {
startCacheInRecoveryMode(cacheDescriptor);
querySchemas.put(cacheDescriptor.cacheId(), cacheDescriptor.schema().copy());
}
}
+
+ /** {@inheritDoc} */
+ @Override public void afterLogicalUpdatesApplied(GridCacheDatabaseSharedManager.RestoreLogicalState logicalState) throws IgniteCheckedException {
+ restorePartitionStates(cacheGroups(), logicalState.partitionRecoveryStates());
+ }
+
+ /**
+ * @param forGroups Cache groups.
+ * @param partitionStates Partition states.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void restorePartitionStates(
+ Collection<CacheGroupContext> forGroups,
+ Map<GroupPartitionId, PartitionRecoverState> partitionStates
+ ) throws IgniteCheckedException {
+ long startRestorePart = U.currentTimeMillis();
+
+ if (log.isInfoEnabled())
+ log.info("Restoring partition state for local groups.");
+
+ long totalProcessed = 0;
+
+ for (CacheGroupContext grp : forGroups)
+ totalProcessed += grp.offheap().restorePartitionStates(partitionStates);
+
+ if (log.isInfoEnabled())
+ log.info("Finished restoring partition state for local groups [" +
+ "groupsProcessed=" + forGroups.size() +
+ ", partitionsProcessed=" + totalProcessed +
+ ", time=" + (U.currentTimeMillis() - startRestorePart) + "ms]");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 2cf302f..db09a89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -31,6 +31,8 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
@@ -82,6 +84,15 @@ public interface IgniteCacheOffheapManager {
public void stop();
/**
+ * Pre-create partitions that resides in page memory or WAL and restores their state.
+ *
+ * @param partitionRecoveryStates Partition recovery states.
+ * @return Number of processed partitions.
+ * @throws IgniteCheckedException If failed.
+ */
+ long restorePartitionStates(Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates) throws IgniteCheckedException;
+
+ /**
* Partition counter update callback. May be overridden by plugin-provided subclasses.
*
* @param part Partition.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 08ce978..6835795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -60,6 +60,8 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapt
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
@@ -258,6 +260,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public long restorePartitionStates(Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates) throws IgniteCheckedException {
+ return 0; // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onKernalStop() {
busyLock.block();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 44fc266..2e792f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -890,15 +891,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.exchange().exchangerBlockingSectionBegin();
try {
- // Stop all recovered caches and groups.
- cctx.cache().onKernalStopCaches(true);
+ List<DatabaseLifecycleListener> listeners = cctx.kernalContext().internalSubscriptionProcessor()
+ .getDatabaseListeners();
- cctx.cache().stopCaches(true);
-
- cctx.database().cleanupRestoredCaches();
-
- // Set initial node started marker.
- cctx.database().nodeStart(null);
+ for (DatabaseLifecycleListener lsnr : listeners)
+ lsnr.onBaselineChange();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
index 1f7ba84..6762109 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteCheckedException;
/**
*
*/
+@SuppressWarnings("RedundantThrows")
public interface DatabaseLifecycleListener {
/**
* Callback executed when data regions become to start-up.
@@ -29,7 +30,15 @@ public interface DatabaseLifecycleListener {
* @param mgr Database shared manager.
* @throws IgniteCheckedException If failed.
*/
- default void onInitDataRegions(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
+ public default void onInitDataRegions(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {}
+
+ /**
+ * Callback executed when node detected that baseline topology is changed and node is not in that baseline.
+ * It's useful to cleanup and invalidate all available data restored at that moment.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public default void onBaselineChange() throws IgniteCheckedException {}
/**
* Callback executed right before node become perform binary recovery.
@@ -37,30 +46,46 @@ public interface DatabaseLifecycleListener {
* @param mgr Database shared manager.
* @throws IgniteCheckedException If failed.
*/
- default void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
+ public default void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {}
/**
* Callback executed when binary memory has fully restored and WAL logging is resumed.
*
- * @param mgr Database shared manager.
+ * @param binaryState Result of binary recovery.
* @throws IgniteCheckedException If failed.
*/
- default void afterBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
+ public default void afterBinaryMemoryRestore(GridCacheDatabaseSharedManager.RestoreBinaryState binaryState)
+ throws IgniteCheckedException {}
/**
+ * Callback executed when all logical updates were applied and page memory become to fully consistent state.
*
- * @param mgr
- * @throws IgniteCheckedException
+ * @param logicalState Result of logical recovery.
+ * @throws IgniteCheckedException If failed.
*/
- default void beforeResumeWalLogging(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
+ public default void afterLogicalUpdatesApplied(GridCacheDatabaseSharedManager.RestoreLogicalState logicalState)
+ throws IgniteCheckedException {}
/**
+ * Callback executed when all physical updates are applied and we are ready to write new physical records
+ * during logical recovery.
+ *
* @param mgr Database shared manager.
+ * @throws IgniteCheckedException If failed.
*/
- default void afterInitialise(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
+ public default void beforeResumeWalLogging(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {}
/**
+ * Callback executed after all data regions are initialized.
+ *
+ * @param mgr Database shared manager.
+ */
+ public default void afterInitialise(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {}
+
+ /**
+ * Callback executed before shared manager will be stopped.
+ *
* @param mgr Database shared manager.
*/
- default void beforeStop(IgniteCacheDatabaseSharedManager mgr) {};
+ public default void beforeStop(IgniteCacheDatabaseSharedManager mgr) {}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 019d1aa..c74972a 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -93,7 +93,6 @@ import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
-import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
@@ -133,6 +132,7 @@ import org.apache.ignite.internal.processors.cache.persistence.partstate.Partiti
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.port.GridPortRecord;
@@ -153,6 +153,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteOutClosure;
@@ -167,7 +168,6 @@ import org.jsr166.ConcurrentLinkedHashMap;
import static java.nio.file.StandardOpenOption.READ;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
@@ -192,9 +192,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private final boolean skipSync = IgniteSystemProperties.getBoolean(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC);
/** */
- private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
-
- /** */
private final int walRebalanceThreshold = IgniteSystemProperties.getInteger(
IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
@@ -218,9 +215,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Checkpoint file name pattern. */
public static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin");
- /** Node started file suffix. */
- public static final String NODE_STARTED_FILE_NAME_SUFFIX = "-node-started.bin";
-
/** */
private static final String MBEAN_NAME = "DataStorageMetrics";
@@ -362,6 +356,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Timeout for checkpoint read lock acquisition in milliseconds. */
private volatile long checkpointReadLockTimeout;
+ /** Flag allows to log additional information about partitions during recovery phases. */
+ private final boolean recoveryVerboseLogging = IgniteSystemProperties.getBoolean(
+ IgniteSystemProperties.IGNITE_RECOVERY_VERBOSE_LOGGING, true);
+
/**
* @param ctx Kernal context.
*/
@@ -484,6 +482,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
final GridKernalContext kernalCtx = cctx.kernalContext();
if (!kernalCtx.clientNode()) {
+ kernalCtx.internalSubscriptionProcessor().registerDatabaseListener(new MetastorageRecoveryLifecycle());
+
checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
cpHistory = new CheckpointHistory(kernalCtx);
@@ -543,7 +543,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
continue;
if (log.isInfoEnabled())
- log.info("Page memory " + region + " for " + grpDesc + " has invalidated.");
+ log.info("Page memory " + region.config().getName() + " for " + grpDesc + " has invalidated.");
int partitions = grpDesc.config().getAffinity().partitions();
@@ -895,31 +895,40 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @return Last seen WAL pointer during binary memory recovery.
* @throws IgniteCheckedException If failed.
*/
- private WALPointer restoreBinaryMemory(Predicate<Integer> cacheGroupsPredicate) throws IgniteCheckedException {
- assert !cctx.kernalContext().clientNode();
-
+ private RestoreBinaryState restoreBinaryMemory(Predicate<Integer> cacheGroupsPredicate) throws IgniteCheckedException {
long time = System.currentTimeMillis();
- checkpointReadLock();
-
try {
+ log.info("Starting binary memory restore for: " + cctx.cache().cacheGroupDescriptors().keySet());
+
+ for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
+ lsnr.beforeBinaryMemoryRestore(this);
+
CheckpointStatus status = readCheckpointStatus();
// First, bring memory to the last consistent checkpoint state if needed.
// This method should return a pointer to the last valid record in the WAL.
- WALPointer restored = performBinaryMemoryRestore(status, cacheGroupsPredicate, true);
+ RestoreBinaryState binaryState = performBinaryMemoryRestore(status, cacheGroupsPredicate, true);
+
+ WALPointer restored = binaryState.lastReadRecordPointer();
if (restored == null && !status.endPtr.equals(CheckpointStatus.NULL_PTR)) {
throw new StorageException("The memory cannot be restored. The critical part of WAL archive is missing " +
"[tailWalPtr=" + restored + ", endPtr=" + status.endPtr + ']');
}
+ else if (restored != null)
+ U.log(log, "Binary memory state restored at node startup [restoredPtr=" + restored + ']');
- nodeStart(restored);
+ // Wal logging is now available.
+ cctx.wal().resumeLogging(restored);
+
+ for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
+ lsnr.afterBinaryMemoryRestore(binaryState);
if (log.isInfoEnabled())
log.info("Binary recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
- return restored;
+ return binaryState;
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, StorageException.class, IOException.class))
@@ -927,97 +936,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
throw e;
}
- finally {
- checkpointReadUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void nodeStart(@Nullable WALPointer ptr) throws IgniteCheckedException {
- FileWALPointer p = (FileWALPointer)(ptr == null ? CheckpointStatus.NULL_PTR : ptr);
-
- String fileName = U.currentTimeMillis() + NODE_STARTED_FILE_NAME_SUFFIX;
- String tmpFileName = fileName + FilePageStoreManager.TMP_SUFFIX;
-
- ByteBuffer buf = ByteBuffer.allocate(FileWALPointer.POINTER_SIZE);
- buf.order(ByteOrder.nativeOrder());
-
- try {
- try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), tmpFileName).toFile(),
- StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
- buf.putLong(p.index());
-
- buf.putInt(p.fileOffset());
-
- buf.putInt(p.length());
-
- buf.flip();
-
- io.writeFully(buf);
-
- buf.clear();
-
- io.force(true);
- }
-
- Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName));
- }
- catch (IOException e) {
- throw new StorageException("Failed to write node start marker: " + ptr, e);
- }
- }
-
- /**
- * Collects memory recovery pointers from node started files. See {@link #nodeStart(WALPointer)}.
- * Each pointer associated with timestamp extracted from file.
- * Tuples are sorted by timestamp.
- *
- * @return Sorted list of tuples (node started timestamp, memory recovery pointer).
- *
- * @throws IgniteCheckedException If failed.
- */
- public List<T2<Long, WALPointer>> nodeStartedPointers() throws IgniteCheckedException {
- List<T2<Long, WALPointer>> res = new ArrayList<>();
-
- try (DirectoryStream<Path> nodeStartedFiles = Files.newDirectoryStream(
- cpDir.toPath(),
- path -> path.toFile().getName().endsWith(NODE_STARTED_FILE_NAME_SUFFIX))
- ) {
- ByteBuffer buf = ByteBuffer.allocate(FileWALPointer.POINTER_SIZE);
- buf.order(ByteOrder.nativeOrder());
-
- for (Path path : nodeStartedFiles) {
- File f = path.toFile();
-
- String name = f.getName();
-
- Long ts = Long.valueOf(name.substring(0, name.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
-
- try (FileIO io = ioFactory.create(f, READ)) {
- io.readFully(buf);
-
- buf.flip();
-
- FileWALPointer ptr = new FileWALPointer(
- buf.getLong(), buf.getInt(), buf.getInt());
-
- res.add(new T2<>(ts, ptr));
-
- buf.clear();
- }
- catch (IOException e) {
- throw new StorageException("Failed to read node started marker file: " + f.getAbsolutePath(), e);
- }
- }
- }
- catch (IOException e) {
- throw new StorageException("Failed to retreive node started files.", e);
- }
-
- // Sort start markers by file timestamp.
- res.sort(Comparator.comparingLong(IgniteBiTuple::get1));
-
- return res;
}
/** {@inheritDoc} */
@@ -1357,7 +1275,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cctx.database().checkpointReadLock();
try {
- cacheGroup.restorePartitionStates(Collections.emptyMap());
+ cacheGroup.offheap().restorePartitionStates(Collections.emptyMap());
if (cacheGroup.localStartVersion().equals(fut.initialVersion()))
cacheGroup.topology().afterStateRestored(fut.initialVersion());
@@ -1504,14 +1422,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
CacheGroupContext grp = tup.get1();
- if (grp.affinityNode()) {
- try {
- cctx.pageStore().shutdownForCacheGroup(grp, tup.get2());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
- "[cache=" + grp.cacheOrGroupName() + "]", e);
- }
+ try {
+ cctx.pageStore().shutdownForCacheGroup(grp, tup.get2());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
+ "[cache=" + grp.cacheOrGroupName() + "]", e);
}
}
}
@@ -1978,32 +1894,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
// Preform early regions startup before restoring state.
initAndStartRegions(kctx.config().getDataStorageConfiguration());
- for (DatabaseLifecycleListener lsnr : getDatabaseListeners(kctx))
- lsnr.beforeBinaryMemoryRestore(this);
-
- log.info("Starting binary memory restore for: " + cctx.cache().cacheGroupDescriptors().keySet());
-
- cctx.pageStore().initializeForMetastorage();
-
// Restore binary memory for all not WAL disabled cache groups.
- WALPointer restored = restoreBinaryMemory(
+ restoreBinaryMemory(
g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g)
);
- if (restored != null)
- U.log(log, "Binary memory state restored at node startup [restoredPtr=" + restored + ']');
-
- for (DatabaseLifecycleListener lsnr : getDatabaseListeners(kctx))
- lsnr.afterBinaryMemoryRestore(this);
+ if (recoveryVerboseLogging && log.isInfoEnabled()) {
+ log.info("Partition states information after BINARY RECOVERY phase:");
- cctx.wal().resumeLogging(restored);
-
- // We should log this record to ensure that node start marker pointer will be found in compacted segment.
- cctx.wal().log(new MemoryRecoveryRecord(System.currentTimeMillis()));
-
- assert metaStorage == null;
-
- metaStorage = createMetastorage(false);
+ dumpPartitionsInfo(cctx, log);
+ }
CheckpointStatus status = readCheckpointStatus();
@@ -2014,8 +1914,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
false
);
- // Restore state for all groups.
- restorePartitionStates(cctx.cache().cacheGroups(), logicalState.partitionRecoveryStates);
+ if (recoveryVerboseLogging && log.isInfoEnabled()) {
+ log.info("Partition states information after LOGICAL RECOVERY phase:");
+
+ dumpPartitionsInfo(cctx, log);
+ }
walTail = tailPointer(logicalState.lastRead);
@@ -2056,32 +1959,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
- * @param forGroups Cache groups.
- * @param partitionStates Partition states.
- * @throws IgniteCheckedException If failed.
- */
- private void restorePartitionStates(
- Collection<CacheGroupContext> forGroups,
- Map<GroupPartitionId, PartitionRecoverState> partitionStates
- ) throws IgniteCheckedException {
- long startRestorePart = U.currentTimeMillis();
-
- if (log.isInfoEnabled())
- log.info("Restoring partition state for local groups.");
-
- long totalProcessed = 0;
-
- for (CacheGroupContext grp : forGroups)
- totalProcessed += grp.restorePartitionStates(partitionStates);
-
- if (log.isInfoEnabled())
- log.info("Finished restoring partition state for local groups [" +
- "groupsProcessed" + forGroups.size() +
- "partitionsProcessed=" + totalProcessed +
- ", time=" + (U.currentTimeMillis() - startRestorePart) + "ms]");
- }
-
- /**
* Called when all partitions have been fully restored and pre-created on node start.
*
* Starts checkpointing process and initiates first checkpoint.
@@ -2108,7 +1985,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @throws IgniteCheckedException If failed.
* @throws StorageException In case I/O error occurred during operations with storage.
*/
- private WALPointer performBinaryMemoryRestore(
+ private RestoreBinaryState performBinaryMemoryRestore(
CheckpointStatus status,
Predicate<Integer> cacheGroupsPredicate,
boolean finalizeState
@@ -2257,7 +2134,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (!finalizeState)
return null;
- WALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer();
+ FileWALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer();
if (status.needRestoreMemory()) {
if (restoreBinaryState.needApplyBinaryUpdate())
@@ -2274,7 +2151,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cpHistory.initialize(retreiveHistory());
- return lastReadPtr != null ? lastReadPtr.next() : null;
+ // Move pointer position to the end of last read record.
+ restoreBinaryState.lastRead = lastReadPtr != null ? lastReadPtr.next() : lastReadPtr;
+
+ return restoreBinaryState;
}
/**
@@ -2311,61 +2191,61 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param it WalIterator.
* @param recPredicate Wal record filter.
* @param entryPredicate Entry filter.
- * @param partitionRecoveryStates Partition to restore state.
*/
public void applyUpdatesOnRecovery(
@Nullable WALIterator it,
- IgnitePredicate<IgniteBiTuple<WALPointer, WALRecord>> recPredicate,
- IgnitePredicate<DataEntry> entryPredicate,
- Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates
+ IgniteBiPredicate<WALPointer, WALRecord> recPredicate,
+ IgnitePredicate<DataEntry> entryPredicate
) throws IgniteCheckedException {
+ if (it == null)
+ return;
+
cctx.walState().runWithOutWAL(() -> {
- if (it != null) {
- while (it.hasNext()) {
- IgniteBiTuple<WALPointer, WALRecord> next = it.next();
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> next = it.next();
- WALRecord rec = next.get2();
+ WALRecord rec = next.get2();
- if (!recPredicate.apply(next))
- break;
+ if (!recPredicate.apply(next.get1(), rec))
+ break;
- switch (rec.type()) {
- case MVCC_DATA_RECORD:
+ switch (rec.type()) {
+ case MVCC_DATA_RECORD:
case DATA_RECORD:
- checkpointReadLock();
+ checkpointReadLock();
- try {
- DataRecord dataRec = (DataRecord)rec;
+ try {
+ DataRecord dataRec = (DataRecord)rec;
- for (DataEntry dataEntry : dataRec.writeEntries()) {
- if (entryPredicate.apply(dataEntry)) {
- checkpointReadLock();
+ for (DataEntry dataEntry : dataRec.writeEntries()) {
+ if (entryPredicate.apply(dataEntry)) {
+ checkpointReadLock();
- try {
- int cacheId = dataEntry.cacheId();
+ try {
+ int cacheId = dataEntry.cacheId();
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
- if (cacheCtx != null)
- applyUpdate(cacheCtx, dataEntry);
- else if (log != null)
- log.warning("Cache is not started. Updates cannot be applied " +
- "[cacheId=" + cacheId + ']');
- }
- finally {
- checkpointReadUnlock();
- }
+ if (cacheCtx != null)
+ applyUpdate(cacheCtx, dataEntry);
+ else if (log != null)
+ log.warning("Cache is not started. Updates cannot be applied " +
+ "[cacheId=" + cacheId + ']');
+ }
+ finally {
+ checkpointReadUnlock();
}
}
}
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- finally {
- checkpointReadUnlock();
- }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ checkpointReadUnlock();
+ }
- break;
+ break;
case MVCC_TX_RECORD:
checkpointReadLock();
@@ -2386,23 +2266,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
break;
- default:
- // Skip other records.
- }
+ default:
+ // Skip other records.
}
}
-
- checkpointReadLock();
-
- try {
- restorePartitionStates(cctx.cache().cacheGroups(), partitionRecoveryStates);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- finally {
- checkpointReadUnlock();
- }
});
}
@@ -2538,6 +2405,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
log.info("Finished applying WAL changes [updatesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + " ms]");
+ for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
+ lsnr.afterLogicalUpdatesApplied(restoreLogicalState);
+
return restoreLogicalState;
}
@@ -4223,12 +4093,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
*
*/
- private static class CheckpointStatus {
+ public static class CheckpointStatus {
/** Null checkpoint UUID. */
private static final UUID NULL_UUID = new UUID(0L, 0L);
/** Null WAL pointer. */
- private static final WALPointer NULL_PTR = new FileWALPointer(0, 0, 0);
+ public static final WALPointer NULL_PTR = new FileWALPointer(0, 0, 0);
/** */
private long cpStartTs;
@@ -4693,6 +4563,97 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * Method dumps partitions info see {@link #dumpPartitionsInfo(CacheGroupContext, IgniteLogger)}
+ * for all persistent cache groups.
+ *
+ * @param cctx Shared context.
+ * @param log Logger.
+ * @throws IgniteCheckedException If failed.
+ */
+ private static void dumpPartitionsInfo(GridCacheSharedContext cctx, IgniteLogger log) throws IgniteCheckedException {
+ for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal() || !grp.persistenceEnabled())
+ continue;
+
+ dumpPartitionsInfo(grp, log);
+ }
+ }
+
+ /**
+ * Retrieves from page memory meta information about given {@code grp} group partitions
+ * and dumps this information to log INFO level.
+ *
+ * @param grp Cache group.
+ * @param log Logger.
+ * @throws IgniteCheckedException If failed.
+ */
+ private static void dumpPartitionsInfo(CacheGroupContext grp, IgniteLogger log) throws IgniteCheckedException {
+ PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
+
+ IgnitePageStoreManager pageStore = grp.shared().pageStore();
+
+ assert pageStore != null : "Persistent cache should have initialize page store manager.";
+
+ for (int p = 0; p < grp.affinity().partitions(); p++) {
+ if (!pageStore.exists(grp.groupId(), p))
+ continue;
+
+ pageStore.ensure(grp.groupId(), p);
+
+ if (pageStore.pages(grp.groupId(), p) <= 1) {
+ log.info("Partition [id=" + p + ", state=N/A (only file header) ]");
+
+ continue;
+ }
+
+ long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
+ long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
+
+ try {
+ long pageAddr = pageMem.readLock(grp.groupId(), partMetaId, partMetaPage);
+
+ try {
+ PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+
+ GridDhtPartitionState partitionState = GridDhtPartitionState.fromOrdinal(io.getPartitionState(pageAddr));
+
+ String state = partitionState != null ? partitionState.toString() : "N/A";
+
+ long updateCounter = io.getUpdateCounter(pageAddr);
+ long size = io.getSize(pageAddr);
+
+ log.info("Partition [grp=" + grp.cacheOrGroupName()
+ + ", id=" + p
+ + ", state=" + state
+ + ", counter=" + updateCounter
+ + ", size=" + size + "]");
+ }
+ finally {
+ pageMem.readUnlock(grp.groupId(), partMetaId, partMetaPage);
+ }
+ }
+ finally {
+ pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
+ }
+ }
+ }
+
+ /**
+ * Recovery lifecycle for read-write metastorage.
+ */
+ private class MetastorageRecoveryLifecycle implements DatabaseLifecycleListener {
+ @Override public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
+ cctx.pageStore().initializeForMetastorage();
+ }
+
+ @Override public void afterBinaryMemoryRestore(RestoreBinaryState binaryState) throws IgniteCheckedException {
+ assert metaStorage == null;
+
+ metaStorage = createMetastorage(false);
+ }
+ }
+
+ /**
* Abstract class for create restore context.
*/
private abstract class RestoreStateContext {
@@ -4764,12 +4725,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
// Filter data entries by group id.
List<DataEntry> filteredEntries = dataRecord.writeEntries().stream()
.filter(entry -> {
- if (entry == null)
- return false;
-
int cacheId = entry.cacheId();
- return cctx != null && cctx.cacheContext(cacheId) != null && cacheGroupPredicate.test(cctx.cacheContext(cacheId).groupId());
+ return cctx.cacheContext(cacheId) != null && cacheGroupPredicate.test(cctx.cacheContext(cacheId).groupId());
})
.collect(Collectors.toList());
@@ -4805,7 +4763,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*
* @return Last read WAL record pointer.
*/
- public WALPointer lastReadRecordPointer() {
+ public FileWALPointer lastReadRecordPointer() {
return lastRead;
}
@@ -4823,7 +4781,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* Restore memory context. Tracks the safety of binary recovery.
*/
- private class RestoreBinaryState extends RestoreStateContext {
+ public class RestoreBinaryState extends RestoreStateContext {
/** Checkpoint status. */
private final CheckpointStatus status;
@@ -4898,7 +4856,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* Restore logical state context. Tracks the safety of logical recovery.
*/
- private class RestoreLogicalState extends RestoreStateContext {
+ public class RestoreLogicalState extends RestoreStateContext {
/** States of partitions recovered during applying logical updates. */
private final Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates = new HashMap<>();
@@ -4908,6 +4866,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
public RestoreLogicalState(long lastArchivedSegment, Predicate<Integer> cacheGroupsPredicate) {
super(lastArchivedSegment, cacheGroupsPredicate, false);
}
+
+ /**
+ * @return Map of restored partition states for cache groups.
+ */
+ public Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates() {
+ return Collections.unmodifiableMap(partitionRecoveryStates);
+ }
}
/** Indicates checkpoint read lock acquisition failure which did not lead to node invalidation. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index f24900f..73cb878 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemor
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange;
import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO;
@@ -112,6 +113,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** */
private ReuseListImpl reuseList;
+ /** Flag indicates that all group partitions have restored their state from page memory / disk. */
+ private volatile boolean partitionStatesRestored;
+
/** {@inheritDoc} */
@Override protected void initPendingTree(GridCacheContext cctx) throws IgniteCheckedException {
// No-op. Per-partition PendingTree should be used.
@@ -396,6 +400,142 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
tryAddEmptyPartitionToSnapshot(store, ctx);
}
+ /** {@inheritDoc} */
+ @Override public long restorePartitionStates(Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates) throws IgniteCheckedException {
+ if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled())
+ return 0;
+
+ if (partitionStatesRestored)
+ return 0;
+
+ long processed = 0;
+
+ PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
+
+ for (int p = 0; p < grp.affinity().partitions(); p++) {
+ PartitionRecoverState recoverState = partitionRecoveryStates.get(new GroupPartitionId(grp.groupId(), p));
+
+ if (ctx.pageStore().exists(grp.groupId(), p)) {
+ ctx.pageStore().ensure(grp.groupId(), p);
+
+ if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition on recovery (pages less than 1) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + "]");
+
+ continue;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Creating partition on recovery (exists in page store) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + "]");
+
+ processed++;
+
+ GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+
+ onPartitionInitialCounterUpdated(p, 0);
+
+ ctx.database().checkpointReadLock();
+
+ try {
+ long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
+ long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
+
+ try {
+ long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage);
+
+ boolean changed = false;
+
+ try {
+ PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+
+ if (recoverState != null) {
+ io.setPartitionState(pageAddr, (byte) recoverState.stateId());
+
+ changed = updateState(part, recoverState.stateId());
+
+ if (recoverState.stateId() == GridDhtPartitionState.OWNING.ordinal()
+ || (recoverState.stateId() == GridDhtPartitionState.MOVING.ordinal()
+ && part.initialUpdateCounter() < recoverState.updateCounter())) {
+ part.initialUpdateCounter(recoverState.updateCounter());
+
+ changed = true;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Restored partition state (from WAL) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+ ", updCntr=" + part.initialUpdateCounter() + "]");
+ }
+ else {
+ int stateId = (int) io.getPartitionState(pageAddr);
+
+ changed = updateState(part, stateId);
+
+ if (log.isDebugEnabled())
+ log.debug("Restored partition state (from page memory) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+ ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId + "]");
+ }
+ }
+ finally {
+ pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed);
+ }
+ }
+ finally {
+ pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
+ }
+ }
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
+ }
+ else if (recoverState != null) {
+ GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+
+ onPartitionInitialCounterUpdated(p, recoverState.updateCounter());
+
+ updateState(part, recoverState.stateId());
+
+ processed++;
+
+ if (log.isDebugEnabled())
+ log.debug("Restored partition state (from WAL) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+ ", updCntr=" + part.initialUpdateCounter() + "]");
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition on recovery (no page store OR wal state) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + "]");
+ }
+ }
+
+ partitionStatesRestored = true;
+
+ return processed;
+ }
+
+ /**
+ * @param part Partition to restore state for.
+ * @param stateId State enum ordinal.
+ * @return Updated flag.
+ */
+ private boolean updateState(GridDhtLocalPartition part, int stateId) {
+ if (stateId != -1) {
+ GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId);
+
+ assert state != null;
+
+ part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state);
+
+ return true;
+ }
+
+ return false;
+ }
+
/**
* Check that we need to snapshot this partition and add it to map.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 4966bca..7fc70d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -697,21 +697,11 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
}
/**
- * Creates file with current timestamp and specific "node-started.bin" suffix
- * and writes into memory recovery pointer.
- *
- * @param ptr Memory recovery wal pointer.
- */
- public void nodeStart(@Nullable WALPointer ptr) throws IgniteCheckedException {
- // No-op.
- }
-
- /**
* @param memPlcName Name of {@link DataRegion} to obtain {@link DataRegionMetrics} for.
* @return {@link DataRegionMetrics} snapshot for specified {@link DataRegion} or {@code null} if
* no {@link DataRegion} is configured for specified name.
*/
- @Nullable public DataRegionMetrics memoryMetrics(String memPlcName) {
+ public @Nullable DataRegionMetrics memoryMetrics(String memPlcName) {
if (!F.isEmpty(memMetricsMap)) {
DataRegionMetrics memMetrics = memMetricsMap.get(memPlcName);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
index 6ea7e00..5e59178 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
@@ -80,7 +80,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
}
/** {@inheritDoc} */
- @Override public WALPointer next() {
+ @Override public FileWALPointer next() {
if (len == 0)
throw new IllegalStateException("Failed to calculate next WAL pointer " +
"(this pointer is a terminal): " + this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 183e147..926e403 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -230,7 +230,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
if (encSpi instanceof NoopEncryptionSpi)
return false;
- if (!(rec instanceof WalRecordCacheGroupAware) || rec instanceof MetastoreDataRecord)
+ if (!(rec instanceof WalRecordCacheGroupAware))
return false;
return needEncryption(((WalRecordCacheGroupAware)rec).groupId());