You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/01 17:17:07 UTC
[3/3] ignite git commit: IGNITE-9420 Move logical recovery outside of
PME - Fixes #5067.
IGNITE-9420 Move logical recovery outside of PME - Fixes #5067.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c076aee4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c076aee4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c076aee4
Branch: refs/heads/master
Commit: c076aee4eed3c121ec92314dd95ac9d5fcaaccb7
Parents: db05c8b
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Thu Nov 1 20:12:16 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Nov 1 20:12:16 2018 +0300
----------------------------------------------------------------------
.../ignite/internal/GridKernalContext.java | 5 +
.../ignite/internal/GridKernalContextImpl.java | 15 +
.../apache/ignite/internal/IgniteKernal.java | 2 +
.../eventstorage/GridEventStorageManager.java | 3 +
.../internal/pagemem/wal/record/DataRecord.java | 7 +
.../wal/record/MemoryRecoveryRecord.java | 9 +-
.../pagemem/wal/record/MetastoreDataRecord.java | 8 +-
.../MetaPageUpdatePartitionDataRecord.java | 3 +-
.../cache/CacheAffinitySharedManager.java | 9 +-
.../processors/cache/CacheGroupContext.java | 288 +++++-
.../processors/cache/GridCacheContext.java | 44 +-
.../processors/cache/GridCacheEventManager.java | 4 +
.../processors/cache/GridCacheProcessor.java | 324 +++++--
.../cache/GridCacheSharedTtlCleanupManager.java | 15 +-
.../GridDhtPartitionsExchangeFuture.java | 36 +-
.../topology/GridDhtPartitionTopologyImpl.java | 2 -
.../cache/mvcc/MvccProcessorImpl.java | 16 +-
.../persistence/DatabaseLifecycleListener.java | 23 +-
.../GridCacheDatabaseSharedManager.java | 926 +++++++++----------
.../IgniteCacheDatabaseSharedManager.java | 98 +-
.../persistence/file/AsyncFileIOFactory.java | 9 -
.../file/EncryptedFileIOFactory.java | 7 -
.../cache/persistence/file/FileIOFactory.java | 10 +-
.../cache/persistence/file/FilePageStore.java | 7 +-
.../file/RandomAccessFileIOFactory.java | 9 -
.../MetastorageLifecycleListener.java | 4 +-
.../partstate/PartitionRecoverState.java | 52 ++
.../snapshot/IgniteCacheSnapshotManager.java | 4 -
.../wal/FileWriteAheadLogManager.java | 58 +-
.../wal/FsyncModeFileWriteAheadLogManager.java | 50 +-
.../wal/reader/StandaloneGridKernalContext.java | 5 +
.../wal/serializer/RecordDataV1Serializer.java | 4 +-
.../IgniteClusterActivateDeactivateTest.java | 2 +-
...tePdsBinaryMetadataOnClusterRestartTest.java | 6 +-
.../IgnitePdsCorruptedIndexTest.java | 10 -
.../IgnitePdsCorruptedStoreTest.java | 11 +-
.../IgnitePdsPartitionFilesDestroyTest.java | 10 -
.../persistence/IgnitePdsTaskCancelingTest.java | 5 -
...lWalModeChangeDuringRebalancingSelfTest.java | 5 -
.../db/CheckpointBufferDeadlockTest.java | 5 -
.../db/IgniteLogicalRecoveryTest.java | 577 ++++++++++++
.../db/IgnitePdsDataRegionMetricsTest.java | 3 +-
.../file/IgnitePdsDiskErrorsRecoveringTest.java | 71 +-
.../IgniteNodeStoppedDuringDisableWALTest.java | 11 +-
.../db/wal/IgniteWalFlushFailoverTest.java | 5 -
...lFlushMultiNodeFailoverAbstractSelfTest.java | 5 -
.../db/wal/IgniteWalFormatFileFailoverTest.java | 9 +-
.../wal/IgniteWalHistoryReservationsTest.java | 16 +-
.../db/wal/IgniteWalRebalanceTest.java | 5 -
.../db/wal/WalRecoveryTxLogicalRecordsTest.java | 6 +-
.../pagemem/PagesWriteThrottleSmokeTest.java | 5 -
.../AbstractNodeJoinTemplate.java | 4 +-
.../wal/AbstractWalDeltaConsistencyTest.java | 2 +
.../wal/ExplicitWalDeltaConsistencyTest.java | 2 +
.../wal/memtracker/PageMemoryTracker.java | 13 +-
.../PageMemoryTrackerPluginProvider.java | 19 +-
.../StandaloneWalRecordsIteratorTest.java | 5 -
...IntegrityWithPrimaryIndexCorruptionTest.java | 3 -
.../loadtests/hashmap/GridCacheTestContext.java | 1 +
.../testframework/junits/GridAbstractTest.java | 7 +-
.../file/AlignedBuffersDirectFileIOFactory.java | 5 -
.../IgnitePdsWithIndexingCoreTestSuite.java | 3 +
62 files changed, 1959 insertions(+), 928 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 970b8e7..a43312c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -698,4 +698,9 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* @return Default uncaught exception handler used by thread pools.
*/
public Thread.UncaughtExceptionHandler uncaughtExceptionHandler();
+
+ /**
+ * @return {@code True} if node is in recovery mode (before join to topology).
+ */
+ public boolean recoveryMode();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 6f6f2d4..8a42664 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -415,6 +415,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** Failure processor. */
private FailureProcessor failureProc;
+ /** Recovery mode flag. Flag is set to {@code false} when discovery manager started. */
+ private boolean recoveryMode = true;
+
/**
* No-arg constructor is required by externalization.
*/
@@ -1181,6 +1184,18 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public boolean recoveryMode() {
+ return recoveryMode;
+ }
+
+ /**
+ * @param recoveryMode Recovery mode.
+ */
+ public void recoveryMode(boolean recoveryMode) {
+ this.recoveryMode = recoveryMode;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridKernalContextImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 250fbd7..b200238 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1048,6 +1048,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ctx.cache().context().database().notifyMetaStorageSubscribersOnReadyForRead();
ctx.cache().context().database().startMemoryRestore(ctx);
+
+ ctx.recoveryMode(false);
}
catch (Throwable e) {
U.error(
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index d4daab8..92a2eef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -316,6 +316,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
private void record0(Event evt, Object... params) {
assert evt != null;
+ if (ctx.recoveryMode())
+ return;
+
if (!enterBusy())
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
index 7a4d6b8..d5ab53a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
@@ -77,6 +77,13 @@ public class DataRecord extends TimeStampRecord {
}
/**
+ * @param writeEntries Write entries.
+ */
+ public void setWriteEntries(List<DataEntry> writeEntries) {
+ this.writeEntries = writeEntries;
+ }
+
+ /**
* @return Collection of write entries.
*/
public List<DataEntry> writeEntries() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MemoryRecoveryRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MemoryRecoveryRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MemoryRecoveryRecord.java
index 92658cc..5a48b34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MemoryRecoveryRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MemoryRecoveryRecord.java
@@ -20,16 +20,11 @@ package org.apache.ignite.internal.pagemem.wal.record;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
- * Marker that we start memory recovering.
- *
- * @deprecated Previously, used to track node started\stopped states. But in fact only
- * mark files created by method GridCacheDatabaseSharedManager#nodeStart(WALPointer)
- * used. Should be removed in 3.0 release.
+ * Marker indicates that binary memory recovery has finished.
*/
-@Deprecated
public class MemoryRecoveryRecord extends WALRecord {
/** Create timestamp, millis */
- private long time;
+ private final long time;
/**
* Default constructor.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java
index e269de2..9e73424 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java
@@ -18,13 +18,14 @@
package org.apache.ignite.internal.pagemem.wal.record;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
/**
*
*/
-public class MetastoreDataRecord extends WALRecord {
+public class MetastoreDataRecord extends WALRecord implements WalRecordCacheGroupAware {
/** */
private final String key;
@@ -59,4 +60,9 @@ public class MetastoreDataRecord extends WALRecord {
@Override public String toString() {
return S.toString(MetastoreDataRecord.class, this, "super", super.toString());
}
+
+ /** {@inheritDoc} */
+ @Override public int groupId() {
+ return MetaStorage.METASTORAGE_CACHE_ID;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
index e5bd343..28294a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
@@ -55,7 +55,8 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
long updateCntr,
long globalRmvId,
int partSize,
- long cntrsPageId, byte state,
+ long cntrsPageId,
+ byte state,
int allocatedIdxCandidate
) {
super(grpId, pageId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 6e10c00..1283696 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import javax.cache.CacheException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -32,6 +31,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
+import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
@@ -362,11 +362,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
void onCacheGroupCreated(CacheGroupContext grp) {
if (!grpHolders.containsKey(grp.groupId())) {
cctx.io().addCacheGroupHandler(grp.groupId(), GridDhtAffinityAssignmentResponse.class,
- new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
- @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
- processAffinityAssignmentResponse(nodeId, res);
- }
- });
+ (IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>) this::processAffinityAssignmentResponse);
}
}
@@ -1281,6 +1277,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
else {
affinityCaches = cctx.kernalContext().cache().cacheGroups().stream()
.filter(grp -> !grp.isLocal())
+ .filter(grp -> !grp.isRecoveryMode())
.map(CacheGroupContext::affinity)
.collect(Collectors.toList());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index a546a36..b0d6c07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -22,8 +22,10 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -39,12 +41,18 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext;
import org.apache.ignite.internal.processors.query.QueryUtils;
@@ -78,13 +86,13 @@ public class CacheGroupContext {
private final int grpId;
/** Node ID cache group was received from. */
- private final UUID rcvdFrom;
+ private volatile UUID rcvdFrom;
/** Flag indicating that this cache group is in a recovery mode due to partitions loss. */
private boolean needsRecovery;
/** */
- private final AffinityTopologyVersion locStartVer;
+ private volatile AffinityTopologyVersion locStartVer;
/** */
private final CacheConfiguration<?, ?> ccfg;
@@ -93,7 +101,7 @@ public class CacheGroupContext {
private final GridCacheSharedContext ctx;
/** */
- private final boolean affNode;
+ private volatile boolean affNode;
/** */
private final CacheType cacheType;
@@ -117,16 +125,17 @@ public class CacheGroupContext {
private final IgniteLogger log;
/** */
- private GridAffinityAssignmentCache aff;
+ private volatile GridAffinityAssignmentCache aff;
/** */
- private GridDhtPartitionTopologyImpl top;
+ private volatile GridDhtPartitionTopologyImpl top;
/** */
- private IgniteCacheOffheapManager offheapMgr;
+ private volatile IgniteCacheOffheapManager offheapMgr;
/** */
- private GridCachePreloader preldr;
+ private volatile GridCachePreloader preldr;
+
/** */
private final DataRegion dataRegion;
@@ -160,6 +169,12 @@ public class CacheGroupContext {
/** */
private volatile boolean globalWalEnabled;
+ /** Flag indicates that cache group is under recovering and not attached to topology. */
+ private final AtomicBoolean recoveryMode;
+
+ /** Flag indicates that all group partitions have restored their state from page memory / disk. */
+ private volatile boolean partitionStatesRestored;
+
/**
* @param ctx Context.
* @param grpId Group ID.
@@ -188,7 +203,8 @@ public class CacheGroupContext {
ReuseList reuseList,
AffinityTopologyVersion locStartVer,
boolean persistenceEnabled,
- boolean walEnabled
+ boolean walEnabled,
+ boolean recoveryMode
) {
assert ccfg != null;
assert dataRegion != null || !affNode;
@@ -208,8 +224,7 @@ public class CacheGroupContext {
this.globalWalEnabled = walEnabled;
this.persistenceEnabled = persistenceEnabled;
this.localWalEnabled = true;
-
- persistGlobalWalState(walEnabled);
+ this.recoveryMode = new AtomicBoolean(recoveryMode);
ioPlc = cacheType.ioPolicy();
@@ -715,9 +730,11 @@ public class CacheGroupContext {
*
*/
public void onKernalStop() {
- aff.cancelFutures(new IgniteCheckedException("Failed to wait for topology update, node is stopping."));
+ if (!isRecoveryMode()) {
+ aff.cancelFutures(new IgniteCheckedException("Failed to wait for topology update, node is stopping."));
- preldr.onKernalStop();
+ preldr.onKernalStop();
+ }
offheapMgr.onKernalStop();
}
@@ -739,6 +756,11 @@ public class CacheGroupContext {
*
*/
void stopGroup() {
+ offheapMgr.stop();
+
+ if (isRecoveryMode())
+ return;
+
IgniteCheckedException err =
new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
@@ -748,12 +770,205 @@ public class CacheGroupContext {
preldr.onKernalStop();
- offheapMgr.stop();
-
ctx.io().removeCacheGroupHandlers(grpId);
}
/**
+ * Finishes recovery for current cache group.
+ * Attaches topology version and initializes I/O.
+ *
+ * @param startVer Cache group start version.
+ * @param originalReceivedFrom UUID of node that was first who initiated cache group creating.
+ * This is needed to decide should node calculate affinity locally or fetch from other nodes.
+ * @param affinityNode Flag indicates, is local node affinity node or not. This may be calculated only after node joined to topology.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void finishRecovery(
+ AffinityTopologyVersion startVer,
+ UUID originalReceivedFrom,
+ boolean affinityNode
+ ) throws IgniteCheckedException {
+ if (recoveryMode.compareAndSet(true, false)) {
+ affNode = affinityNode;
+
+ rcvdFrom = originalReceivedFrom;
+
+ locStartVer = startVer;
+
+ persistGlobalWalState(globalWalEnabled);
+
+ initializeIO();
+
+ ctx.affinity().onCacheGroupCreated(this);
+ }
+ }
+
+ /**
+ * Pre-create partitions that resides in page memory or WAL and restores their state.
+ */
+ public long restorePartitionStates(Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates) throws IgniteCheckedException {
+ if (isLocal() || !affinityNode() || !dataRegion().config().isPersistenceEnabled())
+ return 0;
+
+ if (partitionStatesRestored)
+ return 0;
+
+ long processed = 0;
+
+ PageMemoryEx pageMem = (PageMemoryEx)dataRegion().pageMemory();
+
+ for (int p = 0; p < affinity().partitions(); p++) {
+ PartitionRecoverState recoverState = partitionRecoveryStates.get(new GroupPartitionId(grpId, p));
+
+ if (ctx.pageStore().exists(grpId, p)) {
+ ctx.pageStore().ensure(grpId, p);
+
+ if (ctx.pageStore().pages(grpId, p) <= 1) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition on recovery (pages less than 1) " +
+ "[grp=" + cacheOrGroupName() + ", p=" + p + "]");
+
+ continue;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Creating partition on recovery (exists in page store) " +
+ "[grp=" + cacheOrGroupName() + ", p=" + p + "]");
+
+ processed++;
+
+ GridDhtLocalPartition part = topology().forceCreatePartition(p);
+
+ offheap().onPartitionInitialCounterUpdated(p, 0);
+
+ ctx.database().checkpointReadLock();
+
+ try {
+ long partMetaId = pageMem.partitionMetaPageId(grpId, p);
+ long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
+
+ try {
+ long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
+
+ boolean changed = false;
+
+ try {
+ PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+
+ if (recoverState != null) {
+ io.setPartitionState(pageAddr, (byte) recoverState.stateId());
+
+ changed = updateState(part, recoverState.stateId());
+
+ if (recoverState.stateId() == GridDhtPartitionState.OWNING.ordinal()
+ || (recoverState.stateId() == GridDhtPartitionState.MOVING.ordinal()
+ && part.initialUpdateCounter() < recoverState.updateCounter())) {
+ part.initialUpdateCounter(recoverState.updateCounter());
+
+ changed = true;
+ }
+
+ if (log.isInfoEnabled())
+ log.warning("Restored partition state (from WAL) " +
+ "[grp=" + cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+ ", updCntr=" + part.initialUpdateCounter() + "]");
+ }
+ else {
+ int stateId = (int) io.getPartitionState(pageAddr);
+
+ changed = updateState(part, stateId);
+
+ if (log.isDebugEnabled())
+ log.debug("Restored partition state (from page memory) " +
+ "[grp=" + cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+ ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId + "]");
+ }
+ }
+ finally {
+ pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
+ }
+ }
+ finally {
+ pageMem.releasePage(grpId, partMetaId, partMetaPage);
+ }
+ }
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
+ }
+ else if (recoverState != null) {
+ GridDhtLocalPartition part = topology().forceCreatePartition(p);
+
+ offheap().onPartitionInitialCounterUpdated(p, recoverState.updateCounter());
+
+ updateState(part, recoverState.stateId());
+
+ processed++;
+
+ if (log.isDebugEnabled())
+ log.debug("Restored partition state (from WAL) " +
+ "[grp=" + cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+ ", updCntr=" + part.initialUpdateCounter() + "]");
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition on recovery (no page store OR wal state) " +
+ "[grp=" + cacheOrGroupName() + ", p=" + p + "]");
+ }
+ }
+
+ partitionStatesRestored = true;
+
+ return processed;
+ }
+
+ /**
+ * @param part Partition to restore state for.
+ * @param stateId State enum ordinal.
+ * @return Updated flag.
+ */
+ private boolean updateState(GridDhtLocalPartition part, int stateId) {
+ if (stateId != -1) {
+ GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId);
+
+ assert state != null;
+
+ part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @return {@code True} if current cache group is in recovery mode.
+ */
+ public boolean isRecoveryMode() {
+ return recoveryMode.get();
+ }
+
+ /**
+ * Initializes affinity and rebalance I/O handlers.
+ */
+ private void initializeIO() throws IgniteCheckedException {
+ assert !recoveryMode.get() : "Couldn't initialize I/O handlers, recovery mode is on for group " + this;
+
+ if (ccfg.getCacheMode() != LOCAL) {
+ if (!ctx.kernalContext().clientNode()) {
+ ctx.io().addCacheGroupHandler(groupId(), GridDhtAffinityAssignmentRequest.class,
+ (IgniteBiInClosure<UUID, GridDhtAffinityAssignmentRequest>) this::processAffinityAssignmentRequest);
+ }
+
+ preldr = new GridDhtPreloader(this);
+
+ preldr.start();
+ }
+ else
+ preldr = new GridCachePreloaderAdapter(this);
+ }
+
+ /**
* @return IDs of caches in this group.
*/
public Set<Integer> cacheIds() {
@@ -777,7 +992,7 @@ public class CacheGroupContext {
/**
* @return {@code True} if group contains caches.
*/
- boolean hasCaches() {
+ public boolean hasCaches() {
List<GridCacheContext> caches = this.caches;
return !caches.isEmpty();
@@ -904,39 +1119,25 @@ public class CacheGroupContext {
ccfg.getCacheMode() == LOCAL,
persistenceEnabled());
- if (ccfg.getCacheMode() != LOCAL) {
+ if (ccfg.getCacheMode() != LOCAL)
top = new GridDhtPartitionTopologyImpl(ctx, this);
- if (!ctx.kernalContext().clientNode()) {
- ctx.io().addCacheGroupHandler(groupId(), GridDhtAffinityAssignmentRequest.class,
- new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentRequest>() {
- @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentRequest msg) {
- processAffinityAssignmentRequest(nodeId, msg);
- }
- });
- }
-
- preldr = new GridDhtPreloader(this);
-
- preldr.start();
+ try {
+ offheapMgr = persistenceEnabled
+ ? new GridCacheOffheapManager()
+ : new IgniteCacheOffheapManagerImpl();
}
- else
- preldr = new GridCachePreloaderAdapter(this);
-
- if (persistenceEnabled()) {
- try {
- offheapMgr = new GridCacheOffheapManager();
- }
- catch (Exception e) {
- throw new IgniteCheckedException("Failed to initialize offheap manager", e);
- }
+ catch (Exception e) {
+ throw new IgniteCheckedException("Failed to initialize offheap manager", e);
}
- else
- offheapMgr = new IgniteCacheOffheapManagerImpl();
offheapMgr.start(ctx, this);
- ctx.affinity().onCacheGroupCreated(this);
+ if (!isRecoveryMode()) {
+ initializeIO();
+
+ ctx.affinity().onCacheGroupCreated(this);
+ }
}
/**
@@ -950,8 +1151,7 @@ public class CacheGroupContext {
* @param nodeId Node ID.
* @param req Request.
*/
- private void processAffinityAssignmentRequest(final UUID nodeId,
- final GridDhtAffinityAssignmentRequest req) {
+ private void processAffinityAssignmentRequest(UUID nodeId, GridDhtAffinityAssignmentRequest req) {
if (log.isDebugEnabled())
log.debug("Processing affinity assignment request [node=" + nodeId + ", req=" + req + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 7eea905..ff430b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -226,7 +226,7 @@ public class GridCacheContext<K, V> implements Externalizable {
private CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder;
/** Affinity node. */
- private boolean affNode;
+ private volatile boolean affNode;
/** Conflict resolver. */
private CacheVersionConflictResolver conflictRslvr;
@@ -238,7 +238,7 @@ public class GridCacheContext<K, V> implements Externalizable {
private CountDownLatch startLatch = new CountDownLatch(1);
/** Topology version when cache was started on local node. */
- private AffinityTopologyVersion locStartTopVer;
+ private volatile AffinityTopologyVersion locStartTopVer;
/** Dynamic cache deployment ID. */
private IgniteUuid dynamicDeploymentId;
@@ -271,7 +271,10 @@ public class GridCacheContext<K, V> implements Externalizable {
private boolean readFromBackup = CacheConfiguration.DFLT_READ_FROM_BACKUP;
/** Local node's MAC address. */
- private String locMacs;
+ private volatile String locMacs;
+
+ /** Recovery mode flag. */
+ private volatile boolean recoveryMode;
/**
* Empty constructor required for {@link Externalizable}.
@@ -311,6 +314,7 @@ public class GridCacheContext<K, V> implements Externalizable {
AffinityTopologyVersion locStartTopVer,
boolean affNode,
boolean updatesAllowed,
+ boolean recoveryMode,
/*
* Managers in starting order!
@@ -395,12 +399,46 @@ public class GridCacheContext<K, V> implements Externalizable {
readFromBackup = cacheCfg.isReadFromBackup();
+ this.recoveryMode = recoveryMode;
+
+ assert kernalContext().recoveryMode() == recoveryMode;
+
+ if (!recoveryMode) {
+ locMacs = localNode().attribute(ATTR_MACS);
+
+ assert locMacs != null;
+ }
+ }
+
+ /**
+ * Called when cache was restored during recovery and node has joined to topology.
+ *
+ * @param topVer Cache topology join version.
+ * @param statisticsEnabled Flag indicates is statistics enabled or not for that cache.
+ * Value may be changed after node joined to topology.
+ */
+ public void finishRecovery(AffinityTopologyVersion topVer, boolean statisticsEnabled) {
+ assert recoveryMode : this;
+
+ recoveryMode = false;
+
+ locStartTopVer = topVer;
+
locMacs = localNode().attribute(ATTR_MACS);
+ this.statisticsEnabled = statisticsEnabled;
+
assert locMacs != null;
}
/**
+ * @return {@code True} if cache is in recovery mode.
+ */
+ public boolean isRecoveryMode() {
+ return recoveryMode;
+ }
+
+ /**
* @return Cache group ID.
*/
public int groupId() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 3c5cf1e..ef35016 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -372,6 +372,10 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
public boolean isRecordable(int type) {
GridCacheContext cctx0 = cctx;
+ // Event recording is impossible in recovery mode.
+ if (cctx0 != null && cctx0.kernalContext().recoveryMode())
+ return false;
+
return cctx0 != null && cctx0.userCache() && cctx0.gridEvents().isRecordable(type)
&& !cctx0.config().isEventsDisabled();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 88cc627..e08c796 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -99,6 +99,7 @@ import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
@@ -106,7 +107,6 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
-import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
@@ -201,7 +201,7 @@ import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
* Cache processor.
*/
@SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"})
-public class GridCacheProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener {
+public class GridCacheProcessor extends GridProcessorAdapter {
/** Template of message of conflicts during configuration merge*/
private static final String MERGE_OF_CONFIG_CONFLICTS_MESSAGE =
"Conflicts during configuration merge for cache '%s' : \n%s";
@@ -276,6 +276,9 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
/** Protector of initialization of specific value. */
private final InitializationProtector initializationProtector = new InitializationProtector();
+ /** Cache recovery lifecycle state and actions. */
+ private final CacheRecoveryLifecycle recovery = new CacheRecoveryLifecycle();
+
/**
* @param ctx Kernal context.
*/
@@ -742,41 +745,34 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
}
}
- /** {@inheritDoc} */
- @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
- startCachesOnStart();
- }
-
- /** {@inheritDoc} */
- @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) throws IgniteCheckedException {
- }
-
/**
* @throws IgniteCheckedException If failed.
*/
- private void startCachesOnStart() throws IgniteCheckedException {
- if (!ctx.isDaemon()) {
- Map<String, CacheInfo> caches = new HashMap<>();
+ private void restoreCacheConfigurations() throws IgniteCheckedException {
+ if (ctx.isDaemon())
+ return;
- Map<String, CacheInfo> templates = new HashMap<>();
+ Map<String, CacheInfo> caches = new HashMap<>();
- addCacheOnJoinFromConfig(caches, templates);
+ Map<String, CacheInfo> templates = new HashMap<>();
- CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
+ addCacheOnJoinFromConfig(caches, templates);
+
+ CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
IgniteUuid.randomUuid(),
caches,
templates,
startAllCachesOnClientStart()
- );
+ );
- cachesInfo.onStart(discoData);
- }
+ cachesInfo.onStart(discoData);
}
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override public void start() throws IgniteCheckedException {
- ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+ ctx.internalSubscriptionProcessor().registerMetastorageListener(recovery);
+ ctx.internalSubscriptionProcessor().registerDatabaseListener(recovery);
cachesInfo = new ClusterCachesInfo(ctx);
@@ -802,7 +798,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
mgr.start(sharedCtx);
if (!ctx.isDaemon() && (!CU.isPersistenceEnabled(ctx.config())) || ctx.config().isClientMode())
- startCachesOnStart();
+ restoreCacheConfigurations();
if (log.isDebugEnabled())
log.debug("Started cache processor.");
@@ -1461,7 +1457,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
cache.onKernalStop();
- if (ctx.events().isRecordable(EventType.EVT_CACHE_STOPPED))
+ if (!ctx.isRecoveryMode() && ctx.events().isRecordable(EventType.EVT_CACHE_STOPPED))
ctx.events().addEvent(EventType.EVT_CACHE_STOPPED);
}
@@ -1479,7 +1475,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
* @return Cache context.
* @throws IgniteCheckedException If failed to create cache.
*/
- private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
+ private GridCacheContext<?, ?> createCacheContext(
+ CacheConfiguration<?, ?> cfg,
CacheGroupContext grp,
@Nullable CachePluginManager pluginMgr,
DynamicCacheDescriptor desc,
@@ -1487,8 +1484,9 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
CacheObjectContext cacheObjCtx,
boolean affNode,
boolean updatesAllowed,
- boolean disabledAfterStart)
- throws IgniteCheckedException {
+ boolean disabledAfterStart,
+ boolean recoveryMode
+ ) throws IgniteCheckedException {
assert cfg != null;
if (cfg.getCacheStoreFactory() instanceof GridCacheLoaderWriterStoreFactory) {
@@ -1562,6 +1560,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
locStartTopVer,
affNode,
updatesAllowed,
+ recoveryMode,
/*
* Managers in starting order!
* ===========================
@@ -1697,6 +1696,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
locStartTopVer,
affNode,
true,
+ recoveryMode,
/*
* Managers in starting order!
* ===========================
@@ -2131,12 +2131,16 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
cacheStartFailHandler.handle(
startCacheInfo,
cacheInfo -> {
- ctx.query().onCacheStart(
- cacheContexts.get(cacheInfo),
- cacheInfo.getCacheDescriptor().schema() != null
- ? cacheInfo.getCacheDescriptor().schema()
- : new QuerySchema()
- );
+ GridCacheContext<?, ?> cctx = cacheContexts.get(cacheInfo);
+
+ if (!cctx.isRecoveryMode()) {
+ ctx.query().onCacheStart(
+ cctx,
+ cacheInfo.getCacheDescriptor().schema() != null
+ ? cacheInfo.getCacheDescriptor().schema()
+ : new QuerySchema()
+ );
+ }
context().exchange().exchangerUpdateHeartbeat();
}
@@ -2151,7 +2155,12 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
cacheStartFailHandler.handle(
cacheCtxEntry.getKey(),
cacheInfo -> {
- onCacheStarted(cacheCtxEntry.getValue());
+ GridCacheContext<?, ?> cacheContext = cacheCtxEntry.getValue();
+
+ if (cacheContext.isRecoveryMode())
+ finishRecovery(cacheInfo.getExchangeTopVer(), cacheContext);
+ else
+ onCacheStarted(cacheCtxEntry.getValue());
context().exchange().exchangerUpdateHeartbeat();
}
@@ -2169,7 +2178,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
* change state of proxies to restarting
* @throws IgniteCheckedException If failed.
*/
- void prepareCacheStart(
+ public void prepareCacheStart(
CacheConfiguration startCfg,
DynamicCacheDescriptor desc,
@Nullable NearCacheConfiguration reqNearCfg,
@@ -2180,7 +2189,10 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
ctx.query().onCacheStart(cacheCtx, desc.schema() != null ? desc.schema() : new QuerySchema());
- onCacheStarted(cacheCtx);
+ if (cacheCtx.isRecoveryMode())
+ finishRecovery(exchTopVer, cacheCtx);
+ else
+ onCacheStarted(cacheCtx);
}
/**
@@ -2202,6 +2214,24 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
AffinityTopologyVersion exchTopVer,
boolean disabledAfterStart
) throws IgniteCheckedException {
+ if (caches.containsKey(startCfg.getName())) {
+ GridCacheAdapter<?, ?> existingCache = caches.get(startCfg.getName());
+
+ GridCacheContext<?, ?> cctx = existingCache.context();
+
+ assert cctx.isRecoveryMode();
+
+ QuerySchema localSchema = recovery.querySchemas.get(desc.cacheId());
+
+ QuerySchemaPatch localSchemaPatch = localSchema.makePatch(desc.schema().entities());
+
+ // Cache schema is changed after restart, workaround is stop existing cache and start new.
+ if (!localSchemaPatch.isEmpty() || localSchemaPatch.hasConflicts())
+ stopCacheSafely(cctx);
+ else
+ return existingCache.context();
+ }
+
assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
CacheConfiguration ccfg = new CacheConfiguration(startCfg);
@@ -2210,9 +2240,9 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
boolean affNode = checkForAffinityNode(desc, reqNearCfg, ccfg);
- CacheGroupContext grp = prepareCacheGroup(desc, exchTopVer, cacheObjCtx, affNode, startCfg.getGroupName());
+ CacheGroupContext grp = getOrCreateCacheGroupContext(desc, exchTopVer, cacheObjCtx, affNode, startCfg.getGroupName(), false);
- GridCacheContext cacheCtx = createCache(ccfg,
+ GridCacheContext cacheCtx = createCacheContext(ccfg,
grp,
null,
desc,
@@ -2220,7 +2250,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
cacheObjCtx,
affNode,
true,
- disabledAfterStart
+ disabledAfterStart,
+ false
);
initCacheContext(cacheCtx, ccfg, desc.deploymentId());
@@ -2229,6 +2260,74 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
}
/**
+ * Stops cache under checkpoint lock.
+ * @param cctx Cache context.
+ */
+ private void stopCacheSafely(GridCacheContext<?, ?> cctx) {
+ sharedCtx.database().checkpointReadLock();
+
+ try {
+ prepareCacheStop(cctx.name(), false);
+
+ if (!cctx.group().hasCaches())
+ stopCacheGroup(cctx.group().groupId());
+ }
+ finally {
+ sharedCtx.database().checkpointReadUnlock();
+ }
+
+ }
+
+ /**
+ * Finishes recovery for given cache context.
+ *
+ * @param cacheStartVer Cache join to topology version.
+ * @param cacheContext Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void finishRecovery(AffinityTopologyVersion cacheStartVer, GridCacheContext<?, ?> cacheContext) throws IgniteCheckedException {
+ CacheGroupContext groupContext = cacheContext.group();
+
+ // Take cluster-wide cache descriptor and try to update local cache and cache group parameters.
+ DynamicCacheDescriptor updatedDescriptor = cacheDescriptor(cacheContext.cacheId());
+
+ groupContext.finishRecovery(
+ cacheStartVer,
+ updatedDescriptor.receivedFrom(),
+ isLocalAffinity(updatedDescriptor.cacheConfiguration())
+ );
+
+ cacheContext.finishRecovery(cacheStartVer, updatedDescriptor.cacheConfiguration().isStatisticsEnabled());
+
+ onKernalStart(cacheContext.cache());
+
+ if (log.isInfoEnabled())
+ log.info("Finished recovery for cache [cache=" + cacheContext.name()
+ + ", grp=" + groupContext.cacheOrGroupName() + ", startVer=" + cacheStartVer + "]");
+ }
+
+ /**
+ * Stops all caches and groups, that was recovered, but not activated on node join.
+ * Such caches can remain only if it was filtered by node filter on current node.
+ * It's impossible to check whether current node is affinity node for given cache before join to topology.
+ */
+ public void shutdownNotFinishedRecoveryCaches() {
+ for (GridCacheAdapter cacheAdapter : caches.values()) {
+ GridCacheContext cacheContext = cacheAdapter.context();
+
+ if (cacheContext.isLocal())
+ continue;
+
+ if (cacheContext.isRecoveryMode()) {
+ assert !isLocalAffinity(cacheContext.config())
+ : "Cache " + cacheAdapter.context() + " is still in recovery mode after start, but not activated.";
+
+ stopCacheSafely(cacheContext);
+ }
+ }
+ }
+
+ /**
* Check for affinity node and customize near configuration if needed.
*
* @param desc Cache descriptor.
@@ -2281,12 +2380,13 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
* @return Prepared cache group context.
* @throws IgniteCheckedException if failed.
*/
- private CacheGroupContext prepareCacheGroup(
+ private CacheGroupContext getOrCreateCacheGroupContext(
DynamicCacheDescriptor desc,
AffinityTopologyVersion exchTopVer,
CacheObjectContext cacheObjCtx,
boolean affNode,
- String grpName
+ String grpName,
+ boolean recoveryMode
) throws IgniteCheckedException {
if (grpName != null) {
return initializationProtector.protect(
@@ -2297,7 +2397,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
desc.cacheType(),
affNode,
cacheObjCtx,
- exchTopVer
+ exchTopVer,
+ recoveryMode
)
);
}
@@ -2306,7 +2407,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
desc.cacheType(),
affNode,
cacheObjCtx,
- exchTopVer
+ exchTopVer,
+ recoveryMode
);
}
@@ -2399,6 +2501,67 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
}
/**
+ * @param desc Cache descriptor.
+ * @throws IgniteCheckedException If failed.
+ */
+ private GridCacheContext<?, ?> startCacheInRecoveryMode(
+ DynamicCacheDescriptor desc
+ ) throws IgniteCheckedException {
+ CacheConfiguration cfg = desc.cacheConfiguration();
+
+ CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
+
+ preparePageStore(desc, true);
+
+ CacheGroupContext grp = getOrCreateCacheGroupContext(
+ desc,
+ AffinityTopologyVersion.NONE,
+ cacheObjCtx,
+ true,
+ cfg.getGroupName(),
+ true
+ );
+
+ GridCacheContext cacheCtx = createCacheContext(cfg,
+ grp,
+ null,
+ desc,
+ AffinityTopologyVersion.NONE,
+ cacheObjCtx,
+ true,
+ true,
+ false,
+ true
+ );
+
+ initCacheContext(cacheCtx, cfg, desc.deploymentId());
+
+ cacheCtx.onStarted();
+
+ String dataRegion = cfg.getDataRegionName();
+
+ if (dataRegion == null && ctx.config().getDataStorageConfiguration() != null)
+ dataRegion = ctx.config().getDataStorageConfiguration().getDefaultDataRegionConfiguration().getName();
+
+ grp.onCacheStarted(cacheCtx);
+
+ ctx.query().onCacheStart(cacheCtx, desc.schema() != null ? desc.schema() : new QuerySchema());
+
+ if (log.isInfoEnabled()) {
+ log.info("Started cache in recovery mode [name=" + cfg.getName() +
+ ", id=" + cacheCtx.cacheId() +
+ (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") +
+ ", dataRegionName=" + dataRegion +
+ ", mode=" + cfg.getCacheMode() +
+ ", atomicity=" + cfg.getAtomicityMode() +
+ ", backups=" + cfg.getBackups() +
+ ", mvcc=" + cacheCtx.mvccEnabled() + ']');
+ }
+
+ return cacheCtx;
+ }
+
+ /**
* @param grpName Group name.
* @return Found group or null.
*/
@@ -2448,8 +2611,9 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
CacheType cacheType,
boolean affNode,
CacheObjectContext cacheObjCtx,
- AffinityTopologyVersion exchTopVer)
- throws IgniteCheckedException {
+ AffinityTopologyVersion exchTopVer,
+ boolean recoveryMode
+ ) throws IgniteCheckedException {
CacheConfiguration cfg = new CacheConfiguration(desc.config());
String memPlcName = cfg.getDataRegionName();
@@ -2458,7 +2622,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
FreeList freeList = sharedCtx.database().freeList(memPlcName);
ReuseList reuseList = sharedCtx.database().reuseList(memPlcName);
- boolean persistenceEnabled = sharedCtx.localNode().isClient() ? desc.persistenceEnabled() :
+ boolean persistenceEnabled = recoveryMode || sharedCtx.localNode().isClient() ? desc.persistenceEnabled() :
dataRegion != null && dataRegion.config().isPersistenceEnabled();
CacheGroupContext grp = new CacheGroupContext(sharedCtx,
@@ -2473,7 +2637,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
reuseList,
exchTopVer,
persistenceEnabled,
- desc.walEnabled()
+ desc.walEnabled(),
+ recoveryMode
);
for (Object obj : grp.configuredUserObjects())
@@ -2566,7 +2731,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
* @param destroy Cache data destroy flag. Setting to <code>true</code> will remove all cache data.
* @return Stopped cache context.
*/
- private GridCacheContext<?, ?> prepareCacheStop(String cacheName, boolean destroy) {
+ public GridCacheContext<?, ?> prepareCacheStop(String cacheName, boolean destroy) {
assert sharedCtx.database().checkpointLockIsHeldByThread();
GridCacheAdapter<?, ?> cache = caches.remove(cacheName);
@@ -2668,17 +2833,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
jCacheProxies.remove(cctx.name());
- sharedCtx.database().checkpointReadLock();
-
- try {
- prepareCacheStop(cctx.name(), false);
- }
- finally {
- sharedCtx.database().checkpointReadUnlock();
- }
-
- if (!cctx.group().hasCaches())
- stopCacheGroup(cctx.group().groupId());
+ stopCacheSafely(cctx);
}
finally {
sharedCtx.io().writeUnlock();
@@ -4572,6 +4727,26 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
}
/**
+ * @return Collection of persistent cache descriptors.
+ */
+ public Collection<DynamicCacheDescriptor> persistentCaches() {
+ return cachesInfo.registeredCaches().values()
+ .stream()
+ .filter(desc -> isPersistentCache(desc.cacheConfiguration(), ctx.config().getDataStorageConfiguration()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * @return Collection of persistent cache group descriptors.
+ */
+ public Collection<CacheGroupDescriptor> persistentGroups() {
+ return cachesInfo.registeredCacheGroups().values()
+ .stream()
+ .filter(CacheGroupDescriptor::persistenceEnabled)
+ .collect(Collectors.toList());
+ }
+
+ /**
* @return Cache group descriptors.
*/
public Map<Integer, CacheGroupDescriptor> cacheGroupDescriptors() {
@@ -5255,6 +5430,39 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
}
/**
+ * Recovery lifecycle for caches.
+ */
+ private class CacheRecoveryLifecycle implements MetastorageLifecycleListener, DatabaseLifecycleListener {
+ /** Set of QuerySchema's saved on recovery. It's needed if cache query schema has changed after node joined to topology.*/
+ private final Map<Integer, QuerySchema> querySchemas = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
+ restoreCacheConfigurations();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
+ for (DynamicCacheDescriptor cacheDescriptor : persistentCaches())
+ preparePageStore(cacheDescriptor, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void afterBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
+ for (DynamicCacheDescriptor cacheDescriptor : persistentCaches()) {
+ // Skip MVCC caches.
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-10052
+ if (cacheDescriptor.cacheConfiguration().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
+ continue;
+
+ startCacheInRecoveryMode(cacheDescriptor);
+
+ querySchemas.put(cacheDescriptor.cacheId(), cacheDescriptor.schema().copy());
+ }
+ }
+ }
+
+ /**
* Handle of fail during cache start.
*
* @param <T> Type of started data.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
index 7a54354..ad2342b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
@@ -133,14 +133,17 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap
while (!isCancelled()) {
boolean expiredRemains = false;
- for (GridCacheTtlManager mgr : mgrs) {
- updateHeartbeat();
+ // TTL cleanup is allowed only when node joined to topology.
+ if (!cctx.kernalContext().recoveryMode()) {
+ for (GridCacheTtlManager mgr : mgrs) {
+ updateHeartbeat();
- if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
- expiredRemains = true;
+ if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
+ expiredRemains = true;
- if (isCancelled())
- return;
+ if (isCancelled())
+ return;
+ }
}
updateHeartbeat();
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3d27e8a..d13a7a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -869,10 +869,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @throws IgniteCheckedException If failed.
*/
private IgniteInternalFuture<?> initCachesOnLocalJoin() throws IgniteCheckedException {
- if (!isLocalNodeInBaseline()) {
+ if (!cctx.kernalContext().clientNode() && !isLocalNodeInBaseline()) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
+ // Stop all recovered caches and groups.
+ cctx.cache().onKernalStopCaches(true);
+
+ cctx.cache().stopCaches(true);
+
cctx.database().cleanupRestoredCaches();
// Set initial node started marker.
@@ -892,12 +897,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.exchange().exchangerBlockingSectionEnd();
}
- if (!cctx.kernalContext().clientNode())
- cctx.database().onDoneRestoreBinaryMemory();
-
IgniteInternalFuture<?> cachesRegistrationFut = cctx.cache().startCachesOnLocalJoin(initialVersion(),
exchActions == null ? null : exchActions.localJoinContext());
+ if (!cctx.kernalContext().clientNode())
+ cctx.cache().shutdownNotFinishedRecoveryCaches();
+
ensureClientCachesStarted();
return cachesRegistrationFut;
@@ -1057,15 +1062,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.exchange().exchangerBlockingSectionEnd();
}
- if (!cctx.kernalContext().clientNode())
- cctx.database().onDoneRestoreBinaryMemory();
-
assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started.";
cctx.exchange().exchangerBlockingSectionBegin();
try {
registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
+
+ if (!cctx.kernalContext().clientNode())
+ cctx.cache().shutdownNotFinishedRecoveryCaches();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
@@ -1396,16 +1401,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- /* It is necessary to run database callback before all topology callbacks.
- In case of persistent store is enabled we first restore partitions presented on disk.
- We need to guarantee that there are no partition state changes logged to WAL before this callback
- to make sure that we correctly restored last actual states. */
- boolean restored;
-
cctx.exchange().exchangerBlockingSectionBegin();
try {
- restored = cctx.database().beforeExchange(this);
+ /* It is necessary to run database callback before all topology callbacks.
+ In case of persistent store is enabled we first restore partitions presented on disk.
+ We need to guarantee that there are no partition state changes logged to WAL before this callback
+ to make sure that we correctly restored last actual states. */
+
+ cctx.database().beforeExchange(this);
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
@@ -1432,11 +1436,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
// After all partitions have been restored and pre-created it's safe to make first checkpoint.
- if (restored) {
+ if (localJoinExchange() || activateCluster()) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
- cctx.database().onStateRestored();
+ cctx.database().onStateRestored(initialVersion());
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 1f338d3..b109e34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -885,8 +885,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
locParts.set(p, part);
- ctx.pageStore().onPartitionCreated(grp.groupId(), p);
-
return part;
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index bf51103..e2f802b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -319,18 +319,13 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
/** {@inheritDoc} */
- @Override public void afterInitialise(IgniteCacheDatabaseSharedManager mgr) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
+ @Override public void beforeResumeWalLogging(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
+ // In case of blt changed we should re-init TX_LOG cache.
txLogPageStoreInit(mgr);
}
/** {@inheritDoc} */
- @Override public void beforeResumeWalLogging(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
- // In case of blt changed we should re-init TX_LOG cache.
+ @Override public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
txLogPageStoreInit(mgr);
}
@@ -346,11 +341,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
/** {@inheritDoc} */
- @Override public void afterMemoryRestore(IgniteCacheDatabaseSharedManager mgr) {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer,
@Nullable DiscoveryCustomMessage customMsg) {
if (evtType == EVT_NODE_METRICS_UPDATED)
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
index ae65c77..1f7ba84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
@@ -24,36 +24,43 @@ import org.apache.ignite.IgniteCheckedException;
*/
public interface DatabaseLifecycleListener {
/**
- * @param mgr Database shared manager.
+ * Callback executed when data regions become to start-up.
*
+ * @param mgr Database shared manager.
+ * @throws IgniteCheckedException If failed.
*/
- void onInitDataRegions(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException;
+ default void onInitDataRegions(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
/**
+ * Callback executed right before node become perform binary recovery.
+ *
* @param mgr Database shared manager.
* @throws IgniteCheckedException If failed.
*/
- public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException;
+ default void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
/**
+ * Callback executed when binary memory has fully restored and WAL logging is resumed.
+ *
* @param mgr Database shared manager.
* @throws IgniteCheckedException If failed.
*/
- public void beforeResumeWalLogging(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException;
+ default void afterBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
/**
- * @param mgr Database shared manager.
*
+ * @param mgr
+ * @throws IgniteCheckedException
*/
- void afterMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException;
+ default void beforeResumeWalLogging(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
/**
* @param mgr Database shared manager.
*/
- void afterInitialise(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException;
+ default void afterInitialise(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
/**
* @param mgr Database shared manager.
*/
- void beforeStop(IgniteCacheDatabaseSharedManager mgr);
+ default void beforeStop(IgniteCacheDatabaseSharedManager mgr) {};
}